mirror of https://github.com/apache/nifi.git
NIFI-11489 Removed Deprecated Record Writer and Schema Registry methods
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #7199.
This commit is contained in:
parent
ee03db0e8f
commit
7906e3424c
|
@ -32,9 +32,5 @@
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-record</artifactId>
|
<artifactId>nifi-record</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-deprecation-log</artifactId>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
public interface RecordSchemaCacheService extends ControllerService {
|
public interface RecordSchemaCacheService extends ControllerService {
|
||||||
public static final String CACHE_IDENTIFIER_ATTRIBUTE = "schema.cache.identifier";
|
String CACHE_IDENTIFIER_ATTRIBUTE = "schema.cache.identifier";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the cache to include the given Record Schema and returns an identifier
|
* Updates the cache to include the given Record Schema and returns an identifier
|
||||||
|
|
|
@ -19,12 +19,9 @@ package org.apache.nifi.serialization;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.deprecation.log.DeprecationLogger;
|
|
||||||
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
|
@ -45,11 +42,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
* In this case, if a RecordSchema is known and already available when calling {@link #getSchema(Map, RecordSchema)} method,
|
* In this case, if a RecordSchema is known and already available when calling {@link #getSchema(Map, RecordSchema)} method,
|
||||||
* the schema should be specified so that it can be reused.
|
* the schema should be specified so that it can be reused.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
|
||||||
* <p>
|
|
||||||
* PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible
|
|
||||||
* manner between minor or incremental releases of NiFi.
|
|
||||||
* </p>
|
|
||||||
*/
|
*/
|
||||||
public interface RecordSetWriterFactory extends ControllerService {
|
public interface RecordSetWriterFactory extends ControllerService {
|
||||||
/**
|
/**
|
||||||
|
@ -66,35 +58,6 @@ public interface RecordSetWriterFactory extends ControllerService {
|
||||||
*/
|
*/
|
||||||
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
|
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
|
|
||||||
* because it allows messages to be logged for the component that is calling this Controller Service.
|
|
||||||
* @param schema the schema that will be used for writing records
|
|
||||||
* @param out the OutputStream to write to
|
|
||||||
*
|
|
||||||
* @return a RecordSetWriter that can write record sets to an OutputStream
|
|
||||||
* @throws IOException if unable to read from the given InputStream
|
|
||||||
*
|
|
||||||
* @deprecated Use {@link #createWriter(ComponentLog, RecordSchema, OutputStream, FlowFile)} or {@link #createWriter(ComponentLog, RecordSchema, OutputStream, Map)} instead.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
default RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
|
|
||||||
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
|
|
||||||
final String deprecatedMethod = "createWriter(ComponentLog, RecordSchema, OutputStream)";
|
|
||||||
final String replacementMethod = "createWriter(ComponentLog, RecordSchema, OutputStream, FlowFile)";
|
|
||||||
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
|
|
||||||
getClass().getSimpleName(),
|
|
||||||
getIdentifier(),
|
|
||||||
deprecatedMethod,
|
|
||||||
replacementMethod
|
|
||||||
);
|
|
||||||
return createWriter(logger, schema, out, Collections.emptyMap());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream.
|
* Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream.
|
||||||
|
|
|
@ -167,11 +167,6 @@ public class TestRecordSetWriterLookup {
|
||||||
return new SimpleRecordSchema(SchemaIdentifier.builder().name(name).build());
|
return new SimpleRecordSchema(SchemaIdentifier.builder().name(name).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
|
|
||||||
return new MockRecordSetWriter(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException {
|
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException {
|
||||||
return new MockRecordSetWriter(name);
|
return new MockRecordSetWriter(name);
|
||||||
|
|
|
@ -38,7 +38,7 @@ import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.Collections;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
@ -237,9 +237,9 @@ public class TestXMLRecordSetWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out)
|
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> attributes)
|
||||||
throws SchemaNotFoundException, IOException {
|
throws SchemaNotFoundException, IOException {
|
||||||
return super.createWriter(logger, this.recordSchema, out, Collections.emptyMap());
|
return super.createWriter(logger, this.recordSchema, out, attributes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,9 +31,5 @@
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-record</artifactId>
|
<artifactId>nifi-record</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-deprecation-log</artifactId>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
package org.apache.nifi.schemaregistry.services;
|
package org.apache.nifi.schemaregistry.services;
|
||||||
|
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.deprecation.log.DeprecationLogger;
|
|
||||||
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
|
|
||||||
import org.apache.nifi.schema.access.SchemaField;
|
import org.apache.nifi.schema.access.SchemaField;
|
||||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
@ -32,123 +30,6 @@ import java.util.Set;
|
||||||
* integrate with external Schema Registry
|
* integrate with external Schema Registry
|
||||||
*/
|
*/
|
||||||
public interface SchemaRegistry extends ControllerService {
|
public interface SchemaRegistry extends ControllerService {
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
|
|
||||||
*
|
|
||||||
* Retrieves and returns the textual representation of the schema based on
|
|
||||||
* the provided name of the schema available in Schema Registry.
|
|
||||||
*
|
|
||||||
* @return the text that corresponds to the latest version of the schema with the given name
|
|
||||||
*
|
|
||||||
* @throws IOException if unable to communicate with the backing store
|
|
||||||
* @throws SchemaNotFoundException if unable to find the schema with the given name
|
|
||||||
*/
|
|
||||||
default String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException {
|
|
||||||
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
|
|
||||||
final String deprecatedMethod = "retrieveSchemaText(schemaName)";
|
|
||||||
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
|
|
||||||
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
|
|
||||||
getClass().getSimpleName(),
|
|
||||||
getIdentifier(),
|
|
||||||
deprecatedMethod,
|
|
||||||
replacementMethod
|
|
||||||
);
|
|
||||||
|
|
||||||
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
|
|
||||||
if (recordSchema == null) {
|
|
||||||
throw new SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
|
|
||||||
}
|
|
||||||
return recordSchema.getSchemaText().get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
|
|
||||||
*
|
|
||||||
* Retrieves the textual representation of the schema with the given ID and version
|
|
||||||
*
|
|
||||||
* @param schemaId the unique identifier for the desired schema
|
|
||||||
* @param version the version of the desired schema
|
|
||||||
* @return the textual representation of the schema with the given ID and version
|
|
||||||
*
|
|
||||||
* @throws IOException if unable to communicate with the backing store
|
|
||||||
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
|
|
||||||
*/
|
|
||||||
default String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException {
|
|
||||||
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
|
|
||||||
final String deprecatedMethod = "retrieveSchemaText(schemaId, version)";
|
|
||||||
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
|
|
||||||
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
|
|
||||||
getClass().getSimpleName(),
|
|
||||||
getIdentifier(),
|
|
||||||
deprecatedMethod,
|
|
||||||
replacementMethod
|
|
||||||
);
|
|
||||||
|
|
||||||
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
|
|
||||||
if (recordSchema == null) {
|
|
||||||
throw new SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
|
|
||||||
}
|
|
||||||
return recordSchema.getSchemaText().get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
|
|
||||||
*
|
|
||||||
* Retrieves and returns the RecordSchema based on the provided name of the schema available in Schema Registry. The RecordSchema
|
|
||||||
* that is returned must have the Schema's name populated in its SchemaIdentifier. I.e., a call to
|
|
||||||
* {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getName() getName()}
|
|
||||||
* will always return an {@link java.util.Optional} that is not empty.
|
|
||||||
*
|
|
||||||
* @return the latest version of the schema with the given name, or <code>null</code> if no schema can be found with the given name.
|
|
||||||
* @throws SchemaNotFoundException if unable to find the schema with the given name
|
|
||||||
*/
|
|
||||||
default RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException {
|
|
||||||
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
|
|
||||||
final String deprecatedMethod = "retrieveSchemaText(schemaName)";
|
|
||||||
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
|
|
||||||
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
|
|
||||||
getClass().getSimpleName(),
|
|
||||||
getIdentifier(),
|
|
||||||
deprecatedMethod,
|
|
||||||
replacementMethod
|
|
||||||
);
|
|
||||||
|
|
||||||
return retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
|
|
||||||
*
|
|
||||||
* Retrieves the schema with the given ID and version. The RecordSchema that is returned must have the Schema's identifier and version
|
|
||||||
* populated in its SchemaIdentifier. I.e., a call to
|
|
||||||
* {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getIdentifier() getIdentifier()}
|
|
||||||
* will always return an {@link java.util.Optional} that is not empty, as will a call to
|
|
||||||
* {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getVersion() getVersion()}.
|
|
||||||
*
|
|
||||||
* @param schemaId the unique identifier for the desired schema
|
|
||||||
* @param version the version of the desired schema
|
|
||||||
* @return the schema with the given ID and version or <code>null</code> if no schema
|
|
||||||
* can be found with the given ID and version
|
|
||||||
*
|
|
||||||
* @throws IOException if unable to communicate with the backing store
|
|
||||||
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
|
|
||||||
*/
|
|
||||||
default RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException {
|
|
||||||
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
|
|
||||||
final String deprecatedMethod = "retrieveSchemaText(schemaId, version)";
|
|
||||||
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
|
|
||||||
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
|
|
||||||
getClass().getSimpleName(),
|
|
||||||
getIdentifier(),
|
|
||||||
deprecatedMethod,
|
|
||||||
replacementMethod
|
|
||||||
);
|
|
||||||
|
|
||||||
return retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the schema based on the provided descriptor. The descriptor must contain and schemaIdentifier or name, but not both, along
|
* Retrieves the schema based on the provided descriptor. The descriptor must contain and schemaIdentifier or name, but not both, along
|
||||||
* with a version, and an optional branch name. For implementations that do not support branching, the branch name will be ignored.
|
* with a version, and an optional branch name. For implementations that do not support branching, the branch name will be ignored.
|
||||||
|
|
Loading…
Reference in New Issue