mirror of https://github.com/apache/druid.git
Fix serialization in TaskReportFileWriters. (#12938)
* Fix serialization in TaskReportFileWriters. For some reason, serializing a Map<String, TaskReport> would omit the "type" field. Explicitly sending each value through the ObjectMapper fixes this, because the type information does not get lost. * Fixes for static analysis.
This commit is contained in:
parent
c1a75fca3c
commit
35aaaa9573
|
@ -20,11 +20,9 @@
|
||||||
package org.apache.druid.indexing.common;
|
package org.apache.druid.indexing.common;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@JsonTypeName("ingestionStatsAndErrors")
|
|
||||||
public class IngestionStatsAndErrorsTaskReport implements TaskReport
|
public class IngestionStatsAndErrorsTaskReport implements TaskReport
|
||||||
{
|
{
|
||||||
public static final String REPORT_KEY = "ingestionStatsAndErrors";
|
public static final String REPORT_KEY = "ingestionStatsAndErrors";
|
||||||
|
@ -90,13 +88,4 @@ public class IngestionStatsAndErrorsTaskReport implements TaskReport
|
||||||
", payload=" + payload +
|
", payload=" + payload +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskReports are put into a Map and serialized.
|
|
||||||
// Jackson doesn't normally serialize the TaskReports with a "type" field in that situation,
|
|
||||||
// so explictly serialize the "type" field (otherwise, deserialization fails).
|
|
||||||
@JsonProperty("type")
|
|
||||||
private String getType()
|
|
||||||
{
|
|
||||||
return "ingestionStatsAndErrors";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -49,7 +50,10 @@ public class MultipleFileTaskReportFileWriter implements TaskReportFileWriter
|
||||||
if (reportsFileParent != null) {
|
if (reportsFileParent != null) {
|
||||||
FileUtils.mkdirp(reportsFileParent);
|
FileUtils.mkdirp(reportsFileParent);
|
||||||
}
|
}
|
||||||
objectMapper.writeValue(reportsFile, reports);
|
|
||||||
|
try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) {
|
||||||
|
SingleFileTaskReportFileWriter.writeReportToStream(objectMapper, outputStream, reports);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.error(e, "Encountered exception in write().");
|
log.error(e, "Encountered exception in write().");
|
||||||
|
|
|
@ -19,11 +19,16 @@
|
||||||
|
|
||||||
package org.apache.druid.indexing.common;
|
package org.apache.druid.indexing.common;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||||
import org.apache.druid.java.util.common.FileUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
|
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
|
public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
|
||||||
|
@ -46,17 +51,39 @@ public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
|
||||||
if (reportsFileParent != null) {
|
if (reportsFileParent != null) {
|
||||||
FileUtils.mkdirp(reportsFileParent);
|
FileUtils.mkdirp(reportsFileParent);
|
||||||
}
|
}
|
||||||
objectMapper.writeValue(reportsFile, reports);
|
|
||||||
|
try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) {
|
||||||
|
writeReportToStream(objectMapper, outputStream, reports);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.error(e, "Encountered exception in write().");
|
log.error(e, "Encountered exception in write().");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setObjectMapper(ObjectMapper objectMapper)
|
public void setObjectMapper(ObjectMapper objectMapper)
|
||||||
{
|
{
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void writeReportToStream(
|
||||||
|
final ObjectMapper objectMapper,
|
||||||
|
final OutputStream outputStream,
|
||||||
|
final Map<String, TaskReport> reports
|
||||||
|
) throws Exception
|
||||||
|
{
|
||||||
|
final SerializerProvider serializers = objectMapper.getSerializerProviderInstance();
|
||||||
|
|
||||||
|
try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) {
|
||||||
|
jg.writeStartObject();
|
||||||
|
|
||||||
|
for (final Map.Entry<String, TaskReport> entry : reports.entrySet()) {
|
||||||
|
jg.writeFieldName(entry.getKey());
|
||||||
|
JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
jg.writeEndObject();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,17 +25,24 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.druid.indexer.IngestionState;
|
import org.apache.druid.indexer.IngestionState;
|
||||||
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
|
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
|
||||||
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
|
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
|
||||||
|
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
|
||||||
import org.apache.druid.indexing.common.TaskReport;
|
import org.apache.druid.indexing.common.TaskReport;
|
||||||
import org.apache.druid.indexing.common.TestUtils;
|
import org.apache.druid.indexing.common.TestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class TaskReportSerdeTest
|
public class TaskReportSerdeTest
|
||||||
{
|
{
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||||
|
|
||||||
public TaskReportSerdeTest()
|
public TaskReportSerdeTest()
|
||||||
{
|
{
|
||||||
TestUtils testUtils = new TestUtils();
|
TestUtils testUtils = new TestUtils();
|
||||||
|
@ -61,20 +68,22 @@ public class TaskReportSerdeTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
String report1serialized = jsonMapper.writeValueAsString(report1);
|
String report1serialized = jsonMapper.writeValueAsString(report1);
|
||||||
IngestionStatsAndErrorsTaskReport report2 = jsonMapper.readValue(
|
IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue(
|
||||||
report1serialized,
|
report1serialized,
|
||||||
IngestionStatsAndErrorsTaskReport.class
|
TaskReport.class
|
||||||
);
|
);
|
||||||
Assert.assertEquals(report1, report2);
|
Assert.assertEquals(report1, report2);
|
||||||
Assert.assertEquals(report1.hashCode(), report2.hashCode());
|
Assert.assertEquals(report1.hashCode(), report2.hashCode());
|
||||||
|
|
||||||
|
final File reportFile = temporaryFolder.newFile();
|
||||||
|
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
|
||||||
|
writer.setObjectMapper(jsonMapper);
|
||||||
Map<String, TaskReport> reportMap1 = TaskReport.buildTaskReports(report1);
|
Map<String, TaskReport> reportMap1 = TaskReport.buildTaskReports(report1);
|
||||||
String reportMapSerialized = jsonMapper.writeValueAsString(reportMap1);
|
writer.write("testID", reportMap1);
|
||||||
|
|
||||||
Map<String, TaskReport> reportMap2 = jsonMapper.readValue(
|
Map<String, TaskReport> reportMap2 = jsonMapper.readValue(
|
||||||
reportMapSerialized,
|
reportFile,
|
||||||
new TypeReference<Map<String, TaskReport>>()
|
new TypeReference<Map<String, TaskReport>>() {}
|
||||||
{
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
Assert.assertEquals(reportMap1, reportMap2);
|
Assert.assertEquals(reportMap1, reportMap2);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue