Update Avro to 1.8.0 (#5015)

The druid parquet extensions uses Avro 1.8 and therefore it is
required to update the Avro version itself also to 1.8 to avoid
classpath conflicts
This commit is contained in:
Fokko Driesprong 2017-11-02 16:08:41 +01:00 committed by Gian Merlino
parent 6840eabd87
commit 21e1bf68f6
8 changed files with 62 additions and 39 deletions

0
Dockerfile Normal file
View File

View File

@ -27,6 +27,11 @@
<artifactId>parquet-avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>

View File

@ -37,7 +37,8 @@
<properties>
<schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version>
<avro.version>1.7.7</avro.version>
<avro.version>1.8.0</avro.version>
<pig.version>0.15.0</pig.version>
</properties>
<repositories>
@ -48,6 +49,11 @@
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
@ -105,7 +111,7 @@
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.15.0</version>
<version>${pig.version}</version>
<classifier>h2</classifier>
<scope>test</scope>
<exclusions>
@ -122,7 +128,7 @@
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>piggybank</artifactId>
<version>0.15.0</version>
<version>${pig.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>

View File

@ -28,11 +28,12 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.junit.Before;
import org.junit.Test;
@ -109,12 +110,13 @@ public class AvroHadoopInputRowParserTest
try {
// 0. write avro object into temp file.
File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
new GenericDatumWriter<GenericRecord>()
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
new SpecificDatumWriter<>()
);
dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
dataFileWriter.append(datum);
dataFileWriter.close();
// 1. read avro files into Pig
pigServer = new PigServer(ExecType.LOCAL);
pigServer.registerQuery(
@ -124,16 +126,28 @@ public class AvroHadoopInputRowParserTest
inputStorage
)
);
// 2. write new avro file using AvroStorage
File outputDir = new File(tmpDir, "output");
pigServer.store("A", String.valueOf(outputDir), outputStorage);
ExecJob job = pigServer.store("A", String.valueOf(outputDir), outputStorage);
while (!job.hasCompleted()) {
Thread.sleep(100);
}
assert (job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
// 3. read avro object from AvroStorage
reader = DataFileReader.openReader(
new File(outputDir, "part-m-00000.avro"),
new GenericDatumReader<GenericRecord>()
);
return reader.next();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
if (pigServer != null) {
pigServer.shutdown();

View File

@ -38,10 +38,10 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
import io.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Before;
@ -210,7 +210,7 @@ public class AvroStreamInputRowParserTest
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(byteBuffer.array());
// encode data
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(someAvroDatum.getSchema());
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
// write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
@ -251,7 +251,7 @@ public class AvroStreamInputRowParserTest
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(byteBuffer.array());
// encode data
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(someAvroDatum.getSchema());
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
// write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
@ -321,29 +321,27 @@ public class AvroStreamInputRowParserTest
assertEquals(SOME_INT_VALUE, inputRow.getMetric("someInt"));
}
public static GenericRecord buildSomeAvroDatum() throws IOException
public static SomeAvroDatum buildSomeAvroDatum() throws IOException
{
SomeAvroDatum datum = SomeAvroDatum.newBuilder()
.setTimestamp(DATE_TIME.getMillis())
.setEventType(EVENT_TYPE_VALUE)
.setId(ID_VALUE)
.setSomeOtherId(SOME_OTHER_ID_VALUE)
.setIsValid(true)
.setSomeFloat(SOME_FLOAT_VALUE)
.setSomeInt(SOME_INT_VALUE)
.setSomeLong(SOME_LONG_VALUE)
.setSomeIntArray(SOME_INT_ARRAY_VALUE)
.setSomeStringArray(SOME_STRING_ARRAY_VALUE)
.setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE)
.setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE)
.setSomeUnion(SOME_UNION_VALUE)
.setSomeFixed(SOME_FIXED_VALUE)
.setSomeBytes(SOME_BYTES_VALUE)
.setSomeNull(null)
.setSomeEnum(MyEnum.ENUM1)
.setSomeRecord(SOME_RECORD_VALUE)
.build();
return datum;
return SomeAvroDatum.newBuilder()
.setTimestamp(DATE_TIME.getMillis())
.setEventType(EVENT_TYPE_VALUE)
.setId(ID_VALUE)
.setSomeOtherId(SOME_OTHER_ID_VALUE)
.setIsValid(true)
.setSomeFloat(SOME_FLOAT_VALUE)
.setSomeInt(SOME_INT_VALUE)
.setSomeLong(SOME_LONG_VALUE)
.setSomeIntArray(SOME_INT_ARRAY_VALUE)
.setSomeStringArray(SOME_STRING_ARRAY_VALUE)
.setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE)
.setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE)
.setSomeUnion(SOME_UNION_VALUE)
.setSomeFixed(SOME_FIXED_VALUE)
.setSomeBytes(SOME_BYTES_VALUE)
.setSomeNull(null)
.setSomeEnum(MyEnum.ENUM1)
.setSomeRecord(SOME_RECORD_VALUE)
.build();
}
}

View File

@ -25,10 +25,10 @@ import io.druid.data.input.AvroStreamInputRowParserTest;
import io.druid.data.input.SomeAvroDatum;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Assert;
import org.junit.Test;
@ -81,7 +81,7 @@ public class InlineSchemaAvroBytesDecoderTest
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
GenericRecord actual = new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()));

View File

@ -26,10 +26,10 @@ import io.druid.data.input.AvroStreamInputRowParserTest;
import io.druid.data.input.SomeAvroDatum;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Assert;
import org.junit.Test;
@ -95,7 +95,7 @@ public class InlineSchemasAvroBytesDecoderTest
out.write(new byte[]{1});
out.write(ByteBuffer.allocate(4).putInt(10).array());
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
GenericRecord actual = new InlineSchemasAvroBytesDecoder(

View File

@ -24,10 +24,10 @@ import io.druid.data.input.AvroStreamInputRowParserTest;
import io.druid.data.input.SomeAvroDatum;
import io.druid.java.util.common.parsers.ParseException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -99,7 +99,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
return out.toByteArray();
}