Remove Apache Pig from the tests (#7810)

* Remove Apache Pig from the tests

* Remove the Pig specific part

* Fix the Checkstyle issues

* Cleanup a bit

* Add an additional test

* Revert the abstract class
This commit is contained in:
Fokko Driesprong 2019-06-14 23:18:58 +02:00 committed by Clint Wylie
parent 3bee6adcf7
commit f581118f05
15 changed files with 312 additions and 215 deletions

View File

@ -38,7 +38,6 @@
<schemarepo.version>0.1.3</schemarepo.version> <schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version> <confluent.version>3.0.1</confluent.version>
<avro.version>1.8.2</avro.version> <avro.version>1.8.2</avro.version>
<pig.version>0.15.0</pig.version>
</properties> </properties>
<repositories> <repositories>
@ -160,51 +159,6 @@
<version>2.2.10</version> <version>2.2.10</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>${pig.version}</version>
<classifier>h2</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>piggybank</artifactId>
<version>${pig.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.apache.druid</groupId> <groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId> <artifactId>druid-processing</artifactId>

View File

@ -33,19 +33,16 @@ import java.util.List;
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord> public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{ {
private final ParseSpec parseSpec; private final ParseSpec parseSpec;
private final boolean fromPigAvroStorage;
private final ObjectFlattener<GenericRecord> avroFlattener; private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser; private final MapInputRowParser mapParser;
@JsonCreator @JsonCreator
public AvroHadoopInputRowParser( public AvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec, @JsonProperty("parseSpec") ParseSpec parseSpec
@JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
) )
{ {
this.parseSpec = parseSpec; this.parseSpec = parseSpec;
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
this.mapParser = new MapInputRowParser(parseSpec); this.mapParser = new MapInputRowParser(parseSpec);
} }
@ -62,15 +59,9 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
return parseSpec; return parseSpec;
} }
@JsonProperty
public boolean isFromPigAvroStorage()
{
return fromPigAvroStorage;
}
@Override @Override
public InputRowParser withParseSpec(ParseSpec parseSpec) public InputRowParser withParseSpec(ParseSpec parseSpec)
{ {
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage); return new AvroHadoopInputRowParser(parseSpec);
} }
} }

View File

@ -48,7 +48,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
{ {
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.mapParser = new MapInputRowParser(parseSpec); this.mapParser = new MapInputRowParser(parseSpec);
} }

View File

@ -19,12 +19,10 @@
package org.apache.druid.data.input.avro; package org.apache.druid.data.input.avro;
import com.google.common.collect.Lists;
import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option; import com.jayway.jsonpath.Option;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8; import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -41,7 +39,7 @@ import java.util.stream.Collectors;
public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord> public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
{ {
static final Configuration JSONPATH_CONFIGURATION = private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder() Configuration.builder()
.jsonProvider(new GenericAvroJsonProvider()) .jsonProvider(new GenericAvroJsonProvider())
.mappingProvider(new NotImplementedMappingProvider()) .mappingProvider(new NotImplementedMappingProvider())
@ -57,17 +55,17 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
Schema.Type.DOUBLE Schema.Type.DOUBLE
); );
public static boolean isPrimitive(Schema schema) private static boolean isPrimitive(Schema schema)
{ {
return ROOT_TYPES.contains(schema.getType()); return ROOT_TYPES.contains(schema.getType());
} }
public static boolean isPrimitiveArray(Schema schema) private static boolean isPrimitiveArray(Schema schema)
{ {
return schema.getType().equals(Schema.Type.ARRAY) && isPrimitive(schema.getElementType()); return schema.getType().equals(Schema.Type.ARRAY) && isPrimitive(schema.getElementType());
} }
public static boolean isOptionalPrimitive(Schema schema) private static boolean isOptionalPrimitive(Schema schema)
{ {
return schema.getType().equals(Schema.Type.UNION) && return schema.getType().equals(Schema.Type.UNION) &&
schema.getTypes().size() == 2 && schema.getTypes().size() == 2 &&
@ -79,7 +77,7 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
); );
} }
static boolean isFieldPrimitive(Schema.Field field) private static boolean isFieldPrimitive(Schema.Field field)
{ {
return isPrimitive(field.schema()) || return isPrimitive(field.schema()) ||
isPrimitiveArray(field.schema()) || isPrimitiveArray(field.schema()) ||
@ -87,12 +85,13 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
} }
private final boolean fromPigAvroStorage;
private final boolean binaryAsString; private final boolean binaryAsString;
public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString) /**
* @param binaryAsString boolean to encode the byte[] as a string.
*/
public AvroFlattenerMaker(final boolean binaryAsString)
{ {
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString; this.binaryAsString = binaryAsString;
} }
@ -128,21 +127,16 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
private Object transformValue(final Object field) private Object transformValue(final Object field)
{ {
if (fromPigAvroStorage && field instanceof GenericData.Array) {
return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
}
if (field instanceof ByteBuffer) { if (field instanceof ByteBuffer) {
if (binaryAsString) { if (binaryAsString) {
return StringUtils.fromUtf8(((ByteBuffer) field).array()); return StringUtils.fromUtf8(((ByteBuffer) field).array());
} else { } else {
return ((ByteBuffer) field).array(); return ((ByteBuffer) field).array();
} }
} } else if (field instanceof Utf8) {
if (field instanceof Utf8) {
return field.toString(); return field.toString();
} } else if (field instanceof List) {
if (field instanceof List) { return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
} }
return field; return field;
} }

View File

@ -38,18 +38,17 @@ public class AvroParsers
public static ObjectFlattener<GenericRecord> makeFlattener( public static ObjectFlattener<GenericRecord> makeFlattener(
final ParseSpec parseSpec, final ParseSpec parseSpec,
final boolean fromPigAvroStorage,
final boolean binaryAsString final boolean binaryAsString
) )
{ {
final JSONPathSpec flattenSpec; final JSONPathSpec flattenSpec;
if (parseSpec != null && (parseSpec instanceof AvroParseSpec)) { if (parseSpec instanceof AvroParseSpec) {
flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec(); flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec();
} else { } else {
flattenSpec = JSONPathSpec.DEFAULT; flattenSpec = JSONPathSpec.DEFAULT;
} }
return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString)); return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString));
} }
public static List<InputRow> parseGenericRecord( public static List<InputRow> parseGenericRecord(

View File

@ -25,6 +25,8 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8; import org.apache.avro.util.Utf8;
import javax.annotation.Nullable;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -139,6 +141,7 @@ public class GenericAvroJsonProvider implements JsonProvider
} }
} }
@Nullable
@Override @Override
public Object getMapValue(final Object o, final String s) public Object getMapValue(final Object o, final String s)
{ {

View File

@ -32,6 +32,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Objects;
public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
{ {
@ -83,7 +84,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
return registry != null ? registry.equals(that.registry) : that.registry == null; return Objects.equals(registry, that.registry);
} }
@Override @Override

View File

@ -38,6 +38,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.Objects;
public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDecoder public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDecoder
{ {
@ -107,14 +108,10 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
SchemaRepoBasedAvroBytesDecoder<?, ?> that = (SchemaRepoBasedAvroBytesDecoder<?, ?>) o; SchemaRepoBasedAvroBytesDecoder<?, ?> that = (SchemaRepoBasedAvroBytesDecoder<?, ?>) o;
if (subjectAndIdConverter != null if (!Objects.equals(subjectAndIdConverter, that.subjectAndIdConverter)) {
? !subjectAndIdConverter.equals(that.subjectAndIdConverter)
: that.subjectAndIdConverter != null) {
return false; return false;
} }
return !(schemaRepository != null return Objects.equals(schemaRepository, that.schemaRepository);
? !schemaRepository.equals(that.schemaRepository)
: that.schemaRepository != null);
} }
@Override @Override

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.schemarepo.client.Avro1124RESTRepositoryClient; import org.schemarepo.client.Avro1124RESTRepositoryClient;
import java.util.Objects;
public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient
{ {
private final String url; private final String url;
@ -60,7 +62,7 @@ public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryC
Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o; Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o;
return !(url != null ? !url.equals(that.url) : that.url != null); return Objects.equals(url, that.url);
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter; import org.schemarepo.api.converter.IntegerConverter;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Objects;
/** /**
* This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro * This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro
@ -88,7 +89,7 @@ public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter<Stri
Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o; Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o;
return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null); return Objects.equals(topic, converter.topic);
} }

View File

@ -21,7 +21,6 @@ package org.apache.druid.data.input;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Closeables;
import com.google.common.io.Files; import com.google.common.io.Files;
import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.DataFileWriter;
@ -29,12 +28,7 @@ import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.avro.AvroExtensionsModule; import org.apache.druid.data.input.avro.AvroExtensionsModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -59,102 +53,59 @@ public class AvroHadoopInputRowParserTest
} }
@Test @Test
public void testParseNotFromPigAvroStorage() throws IOException public void testParseNotFromSpark() throws IOException
{ {
testParse(buildSomeAvroDatum(), false); testParse(buildSomeAvroDatum());
} }
@Test @Test
public void testParseFromPiggyBankAvroStorage() throws IOException public void testParseFromSpark() throws IOException
{ {
testParse(buildPiggyBankAvro(), false); testParse(buildAvroFromFile());
} }
@Test private void testParse(GenericRecord record) throws IOException
public void testParseFromPigAvroStorage() throws IOException
{ {
testParse(buildPigAvro(), true); AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC);
}
private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
{
AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC, fromPigAvroStorage);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue( AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser), jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class AvroHadoopInputRowParser.class
); );
InputRow inputRow = parser2.parseBatch(record).get(0); InputRow inputRow = parser2.parseBatch(record).get(0);
assertInputRowCorrect(inputRow, DIMENSIONS, fromPigAvroStorage); assertInputRowCorrect(inputRow, DIMENSIONS);
} }
private static GenericRecord buildAvroFromFile() throws IOException
public static GenericRecord buildPigAvro() throws IOException
{ {
return buildPigAvro(buildSomeAvroDatum(), "AvroStorage", "AvroStorage"); return buildAvroFromFile(
} buildSomeAvroDatum()
public static GenericRecord buildPiggyBankAvro() throws IOException
{
return buildPigAvro(
buildSomeAvroDatum(),
"org.apache.pig.piggybank.storage.avro.AvroStorage",
"org.apache.pig.piggybank.storage.avro.AvroStorage('field7','{\"type\":\"map\",\"values\":\"int\"}','field8','{\"type\":\"map\",\"values\":\"string\"}')"
); );
} }
private static GenericRecord buildPigAvro(GenericRecord datum, String inputStorage, String outputStorage) private static GenericRecord buildAvroFromFile(GenericRecord datum)
throws IOException throws IOException
{ {
final File tmpDir = Files.createTempDir(); final File tmpDir = Files.createTempDir();
FileReader<GenericRecord> reader = null;
PigServer pigServer = null; // 0. write avro object into temp file.
try { File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
// 0. write avro object into temp file. try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro"); new SpecificDatumWriter<>()
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>( )) {
new SpecificDatumWriter<>()
);
dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile); dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
dataFileWriter.append(datum); dataFileWriter.append(datum);
dataFileWriter.close();
// 1. read avro files into Pig
pigServer = new PigServer(ExecType.LOCAL);
pigServer.registerQuery(
StringUtils.format(
"A = LOAD '%s' USING %s;",
someAvroDatumFile,
inputStorage
)
);
// 2. write new avro file using AvroStorage
File outputDir = new File(tmpDir, "output");
ExecJob job = pigServer.store("A", String.valueOf(outputDir), outputStorage);
while (!job.hasCompleted()) {
Thread.sleep(100);
}
assert (job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
// 3. read avro object from AvroStorage
reader = DataFileReader.openReader(
new File(outputDir, "part-m-00000.avro"),
new GenericDatumReader<GenericRecord>()
);
return reader.next();
} }
catch (InterruptedException e) {
throw new RuntimeException(e); final GenericRecord record;
} // 3. read avro object from AvroStorage
finally { try (FileReader<GenericRecord> reader = DataFileReader.openReader(
if (pigServer != null) { someAvroDatumFile,
pigServer.shutdown(); new GenericDatumReader<>()
} )) {
Closeables.close(reader, true); record = reader.next();
FileUtils.deleteDirectory(tmpDir);
} }
return record;
} }
} }

View File

@ -55,6 +55,7 @@ import org.schemarepo.api.converter.AvroSchemaConverter;
import org.schemarepo.api.converter.IdentityConverter; import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter; import org.schemarepo.api.converter.IntegerConverter;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -71,20 +72,20 @@ import static org.junit.Assert.assertEquals;
public class AvroStreamInputRowParserTest public class AvroStreamInputRowParserTest
{ {
public static final String EVENT_TYPE = "eventType"; private static final String EVENT_TYPE = "eventType";
public static final String ID = "id"; private static final String ID = "id";
public static final String SOME_OTHER_ID = "someOtherId"; private static final String SOME_OTHER_ID = "someOtherId";
public static final String IS_VALID = "isValid"; private static final String IS_VALID = "isValid";
public static final String TOPIC = "aTopic"; private static final String TOPIC = "aTopic";
public static final String EVENT_TYPE_VALUE = "type-a"; private static final String EVENT_TYPE_VALUE = "type-a";
public static final long ID_VALUE = 1976491L; private static final long ID_VALUE = 1976491L;
public static final long SOME_OTHER_ID_VALUE = 6568719896L; private static final long SOME_OTHER_ID_VALUE = 6568719896L;
public static final float SOME_FLOAT_VALUE = 0.23555f; private static final float SOME_FLOAT_VALUE = 0.23555f;
public static final int SOME_INT_VALUE = 1; private static final int SOME_INT_VALUE = 1;
public static final long SOME_LONG_VALUE = 679865987569912369L; private static final long SOME_LONG_VALUE = 679865987569912369L;
public static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC()); private static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC());
public static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
public static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList( private static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList(
"nested", "nested",
SOME_OTHER_ID, SOME_OTHER_ID,
"someStringArray", "someStringArray",
@ -98,7 +99,7 @@ public class AvroStreamInputRowParserTest
"someInt", "someInt",
"timestamp" "timestamp"
); );
public static final AvroParseSpec PARSE_SPEC = new AvroParseSpec( static final AvroParseSpec PARSE_SPEC = new AvroParseSpec(
new TimestampSpec("nested", "millis", null), new TimestampSpec("nested", "millis", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null),
new JSONPathSpec( new JSONPathSpec(
@ -108,7 +109,7 @@ public class AvroStreamInputRowParserTest
) )
) )
); );
public static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec( private static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec(
new TimestampSpec("nested", "millis", null), new TimestampSpec("nested", "millis", null),
new DimensionsSpec(null, null, null), new DimensionsSpec(null, null, null),
new JSONPathSpec( new JSONPathSpec(
@ -118,19 +119,19 @@ public class AvroStreamInputRowParserTest
) )
) )
); );
public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array()); private static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
private static final long SUB_LONG_VALUE = 1543698L; private static final long SUB_LONG_VALUE = 1543698L;
private static final int SUB_INT_VALUE = 4892; private static final int SUB_INT_VALUE = 4892;
public static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder() private static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder()
.setSubInt(SUB_INT_VALUE) .setSubInt(SUB_INT_VALUE)
.setSubLong(SUB_LONG_VALUE) .setSubLong(SUB_LONG_VALUE)
.build(); .build();
public static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList((CharSequence) "8", "4", "2", "1"); private static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList("8", "4", "2", "1");
public static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8); private static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
public static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap( private static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>() new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
{ {
@Nullable @Nonnull
@Override @Override
public Integer apply(@Nullable CharSequence input) public Integer apply(@Nullable CharSequence input)
{ {
@ -138,10 +139,10 @@ public class AvroStreamInputRowParserTest
} }
} }
); );
public static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap( private static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>() new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
{ {
@Nullable @Nonnull
@Override @Override
public CharSequence apply(@Nullable CharSequence input) public CharSequence apply(@Nullable CharSequence input)
{ {
@ -149,8 +150,8 @@ public class AvroStreamInputRowParserTest
} }
} }
); );
public static final String SOME_UNION_VALUE = "string as union"; private static final String SOME_UNION_VALUE = "string as union";
public static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8); private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]"); private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");
@ -173,7 +174,7 @@ public class AvroStreamInputRowParserTest
Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
AvroStreamInputRowParser parser = new AvroStreamInputRowParser( AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC, PARSE_SPEC,
new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository) new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
); );
ByteBufferInputRowParser parser2 = jsonMapper.readValue( ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser), jsonMapper.writeValueAsString(parser),
@ -190,7 +191,7 @@ public class AvroStreamInputRowParserTest
Repository repository = new InMemoryRepository(null); Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser( AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC, PARSE_SPEC,
new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository) new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
); );
ByteBufferInputRowParser parser2 = jsonMapper.readValue( ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser), jsonMapper.writeValueAsString(parser),
@ -221,7 +222,7 @@ public class AvroStreamInputRowParserTest
InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
assertInputRowCorrect(inputRow, DIMENSIONS, false); assertInputRowCorrect(inputRow, DIMENSIONS);
} }
@Test @Test
@ -231,7 +232,7 @@ public class AvroStreamInputRowParserTest
Repository repository = new InMemoryRepository(null); Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser( AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC_SCHEMALESS, PARSE_SPEC_SCHEMALESS,
new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository) new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
); );
ByteBufferInputRowParser parser2 = jsonMapper.readValue( ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser), jsonMapper.writeValueAsString(parser),
@ -244,7 +245,7 @@ public class AvroStreamInputRowParserTest
// encode schema id // encode schema id
Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<Integer, Schema, String>( TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
repository, repository,
new IntegerConverter(), new IntegerConverter(),
new AvroSchemaConverter(), new AvroSchemaConverter(),
@ -253,19 +254,20 @@ public class AvroStreamInputRowParserTest
Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
ByteBuffer byteBuffer = ByteBuffer.allocate(4); ByteBuffer byteBuffer = ByteBuffer.allocate(4);
converter.putSubjectAndId(id, byteBuffer); converter.putSubjectAndId(id, byteBuffer);
ByteArrayOutputStream out = new ByteArrayOutputStream(); try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
out.write(byteBuffer.array()); out.write(byteBuffer.array());
// encode data // encode data
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
// write avro datum to bytes // write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false); assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS);
}
} }
public static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions, boolean isFromPigAvro) static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions)
{ {
assertEquals(expectedDimensions, inputRow.getDimensions()); assertEquals(expectedDimensions, inputRow.getDimensions());
assertEquals(1543698L, inputRow.getTimestampFromEpoch()); assertEquals(1543698L, inputRow.getTimestampFromEpoch());
@ -316,9 +318,7 @@ public class AvroStreamInputRowParserTest
); );
assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion")); assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
assertEquals(Collections.emptyList(), inputRow.getDimension("someNull")); assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
if (isFromPigAvro) { assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed"));
assertEquals(String.valueOf(SOME_FIXED_VALUE), Arrays.toString((byte[]) inputRow.getRaw("someFixed")));
}
assertEquals( assertEquals(
Arrays.toString(SOME_BYTES_VALUE.array()), Arrays.toString(SOME_BYTES_VALUE.array()),
Arrays.toString((byte[]) (inputRow.getRaw("someBytes"))) Arrays.toString((byte[]) (inputRow.getRaw("someBytes")))

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.avro;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
import org.junit.Assert;
import org.junit.Test;
public class AvroFlattenerMakerTest
{
@Test
public void getRootField()
{
final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
Assert.assertEquals(
record.timestamp,
flattener.getRootField(record, "timestamp")
);
Assert.assertEquals(
record.eventType,
flattener.getRootField(record, "eventType")
);
Assert.assertEquals(
record.id,
flattener.getRootField(record, "id")
);
Assert.assertEquals(
record.someOtherId,
flattener.getRootField(record, "someOtherId")
);
Assert.assertEquals(
record.isValid,
flattener.getRootField(record, "isValid")
);
Assert.assertEquals(
record.someIntArray,
flattener.getRootField(record, "someIntArray")
);
Assert.assertEquals(
record.someStringArray,
flattener.getRootField(record, "someStringArray")
);
Assert.assertEquals(
record.someIntValueMap,
flattener.getRootField(record, "someIntValueMap")
);
Assert.assertEquals(
record.someStringValueMap,
flattener.getRootField(record, "someStringValueMap")
);
Assert.assertEquals(
record.someUnion,
flattener.getRootField(record, "someUnion")
);
Assert.assertEquals(
record.someNull,
flattener.getRootField(record, "someNull")
);
Assert.assertEquals(
record.someFixed,
flattener.getRootField(record, "someFixed")
);
Assert.assertEquals(
// Casted to an array by transformValue
record.someBytes.array(),
flattener.getRootField(record, "someBytes")
);
Assert.assertEquals(
record.someEnum,
flattener.getRootField(record, "someEnum")
);
Assert.assertEquals(
record.someRecord,
flattener.getRootField(record, "someRecord")
);
Assert.assertEquals(
record.someLong,
flattener.getRootField(record, "someLong")
);
Assert.assertEquals(
record.someInt,
flattener.getRootField(record, "someInt")
);
Assert.assertEquals(
record.someFloat,
flattener.getRootField(record, "someFloat")
);
}
@Test
public void makeJsonPathExtractor()
{
final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
Assert.assertEquals(
record.timestamp,
flattener.makeJsonPathExtractor("$.timestamp").apply(record)
);
Assert.assertEquals(
record.eventType,
flattener.makeJsonPathExtractor("$.eventType").apply(record)
);
Assert.assertEquals(
record.id,
flattener.makeJsonPathExtractor("$.id").apply(record)
);
Assert.assertEquals(
record.someOtherId,
flattener.makeJsonPathExtractor("$.someOtherId").apply(record)
);
Assert.assertEquals(
record.isValid,
flattener.makeJsonPathExtractor("$.isValid").apply(record)
);
Assert.assertEquals(
record.someIntArray,
flattener.makeJsonPathExtractor("$.someIntArray").apply(record)
);
Assert.assertEquals(
record.someStringArray,
flattener.makeJsonPathExtractor("$.someStringArray").apply(record)
);
Assert.assertEquals(
record.someIntValueMap,
flattener.makeJsonPathExtractor("$.someIntValueMap").apply(record)
);
Assert.assertEquals(
record.someStringValueMap,
flattener.makeJsonPathExtractor("$.someStringValueMap").apply(record)
);
Assert.assertEquals(
record.someUnion,
flattener.makeJsonPathExtractor("$.someUnion").apply(record)
);
Assert.assertEquals(
record.someNull,
flattener.makeJsonPathExtractor("$.someNull").apply(record)
);
Assert.assertEquals(
record.someFixed,
flattener.makeJsonPathExtractor("$.someFixed").apply(record)
);
Assert.assertEquals(
// Casted to an array by transformValue
record.someBytes.array(),
flattener.makeJsonPathExtractor("$.someBytes").apply(record)
);
Assert.assertEquals(
record.someEnum,
flattener.makeJsonPathExtractor("$.someEnum").apply(record)
);
Assert.assertEquals(
record.someRecord,
flattener.makeJsonPathExtractor("$.someRecord").apply(record)
);
Assert.assertEquals(
record.someLong,
flattener.makeJsonPathExtractor("$.someLong").apply(record)
);
Assert.assertEquals(
record.someInt,
flattener.makeJsonPathExtractor("$.someInt").apply(record)
);
Assert.assertEquals(
record.someFloat,
flattener.makeJsonPathExtractor("$.someFloat").apply(record)
);
}
@Test(expected = UnsupportedOperationException.class)
public void makeJsonQueryExtractor()
{
final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
Assert.assertEquals(
record.timestamp,
flattener.makeJsonQueryExtractor("$.timestamp").apply(record)
);
}
}

View File

@ -45,8 +45,7 @@ import static org.mockito.Mockito.when;
*/ */
public class SchemaRegistryBasedAvroBytesDecoderTest public class SchemaRegistryBasedAvroBytesDecoderTest
{ {
private SchemaRegistryClient registry;
SchemaRegistryClient registry;
@Before @Before
public void setUp() public void setUp()
@ -96,7 +95,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
} }
byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
{ {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema); DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);

View File

@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -74,7 +75,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
this.recordFlattener = ObjectFlatteners.create( this.recordFlattener = ObjectFlatteners.create(
flattenSpec, flattenSpec,
new AvroFlattenerMaker(false, this.binaryAsString) new AvroFlattenerMaker(this.binaryAsString)
); );
} }
@ -92,6 +93,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
/** /**
* imitate avro extension {@link org.apache.druid.data.input.avro.AvroParsers#parseGenericRecord} * imitate avro extension {@link org.apache.druid.data.input.avro.AvroParsers#parseGenericRecord}
*/ */
@Nonnull
@Override @Override
public List<InputRow> parseBatch(GenericRecord record) public List<InputRow> parseBatch(GenericRecord record)
{ {