Explain Avro's unnecessary EOFException (#4098) (#4100)

* Explain Avro's unnecessary EOFException (#4098)

* add jira link into log message
This commit is contained in:
Benedict Jin 2017-03-24 23:45:45 +08:00 committed by Himanshu
parent 2cbc4764f8
commit 23f77ebd20
3 changed files with 28 additions and 12 deletions

View File

@ -25,11 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@ -37,6 +35,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.ByteBufferInputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
@ -45,7 +44,7 @@ import java.util.Map;
*/
public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
{
private static final Logger logger = new Logger(InlineSchemaAvroBytesDecoder.class);
private static final Logger LOGGER = new Logger(InlineSchemaAvroBytesDecoder.class);
private final Schema schemaObj;
private final Map<String, Object> schema;
@ -59,9 +58,9 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
Preconditions.checkArgument(schema != null, "schema must be provided");
this.schema = schema;
String schemaStr = mapper.writeValueAsString(schema);;
String schemaStr = mapper.writeValueAsString(schema);
logger.info("Schema string [%s]", schemaStr);
LOGGER.info("Schema string [%s]", schemaStr);
schemaObj = new Schema.Parser().parse(schemaStr);
}
@ -87,6 +86,12 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
// waiting for avro v1.9.0 (#AVRO-813)
throw new ParseException(
eof, "Avro's unnecessary EOFException, detail: [%s]", "https://issues.apache.org/jira/browse/AVRO-813"
);
}
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!");
}

View File

@ -25,11 +25,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@ -37,6 +35,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.ByteBufferInputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
@ -46,7 +45,7 @@ import java.util.Map;
*/
public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
{
private static final Logger logger = new Logger(InlineSchemasAvroBytesDecoder.class);
private static final Logger LOGGER = new Logger(InlineSchemasAvroBytesDecoder.class);
private static final byte V1 = 0x1;
@ -72,9 +71,10 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
int id = Integer.parseInt(e.getKey());
Map<String, Object> schema = e.getValue();
String schemaStr = mapper.writeValueAsString(schema);;
String schemaStr = mapper.writeValueAsString(schema);
;
logger.info("Schema string [%s] = [%s]", id, schemaStr);
LOGGER.info("Schema string [%s] = [%s]", id, schemaStr);
schemaObjs.put(id, new Schema.Parser().parse(schemaStr));
}
}
@ -122,6 +122,12 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
// waiting for avro v1.9.0 (#AVRO-813)
throw new ParseException(
eof, "Avro's unnecessary EOFException, detail: [%s]", "https://issues.apache.org/jira/browse/AVRO-813"
);
}
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message with schemaId [%s].", schemaId);
}

View File

@ -20,11 +20,9 @@ package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.schemarepo.SubjectAndIdConverter;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.parsers.ParseException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@ -35,6 +33,7 @@ import org.schemarepo.Repository;
import org.schemarepo.api.TypedSchemaRepository;
import org.schemarepo.api.converter.AvroSchemaConverter;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@ -83,6 +82,12 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
// waiting for avro v1.9.0 (#AVRO-813)
throw new ParseException(
eof, "Avro's unnecessary EOFException, detail: [%s]", "https://issues.apache.org/jira/browse/AVRO-813"
);
}
catch (IOException e) {
throw new ParseException(e, "Fail to decode avro message!");
}