Add context to HadoopIngestionSpec (#13624)

* add context to HadoopIngestionSpec

* fix alert
This commit is contained in:
Maytas Monsereenusorn 2023-01-09 14:37:02 -10:00 committed by GitHub
parent a800dae87a
commit 62a105ee65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 6 deletions

View File

@ -39,6 +39,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -55,12 +56,15 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
//this is used in the temporary paths on the hdfs unique to an hadoop indexing task
private final String uniqueId;
private final Map<String, Object> context;
@JsonCreator
public HadoopIngestionSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") HadoopIOConfig ioConfig,
@JsonProperty("tuningConfig") @Nullable HadoopTuningConfig tuningConfig,
@JsonProperty("uniqueId") @Nullable String uniqueId
@JsonProperty("uniqueId") @Nullable String uniqueId,
@JsonProperty("context") @Nullable Map<String, Object> context
)
{
super(dataSchema, ioConfig, tuningConfig);
@ -69,6 +73,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? HadoopTuningConfig.makeDefaultTuningConfig() : tuningConfig;
this.uniqueId = uniqueId == null ? UUIDUtils.generateUuid() : uniqueId;
this.context = context == null ? new HashMap<>() : new HashMap<>(context);
}
//for unit tests
@ -78,7 +83,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
HadoopTuningConfig tuningConfig
)
{
this(dataSchema, ioConfig, tuningConfig, null);
this(dataSchema, ioConfig, tuningConfig, null, null);
}
@JsonProperty("dataSchema")
@ -108,13 +113,20 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
return uniqueId;
}
@JsonProperty("context")
public Map<String, Object> getContext()
{
return context;
}
public HadoopIngestionSpec withDataSchema(DataSchema schema)
{
return new HadoopIngestionSpec(
schema,
ioConfig,
tuningConfig,
uniqueId
uniqueId,
context
);
}
@ -124,7 +136,8 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
dataSchema,
config,
tuningConfig,
uniqueId
uniqueId,
context
);
}
@ -134,7 +147,19 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
dataSchema,
ioConfig,
config,
uniqueId
uniqueId,
context
);
}
public HadoopIngestionSpec withContext(Map<String, Object> context)
{
return new HadoopIngestionSpec(
dataSchema,
ioConfig,
tuningConfig,
uniqueId,
context
);
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@ -296,6 +297,34 @@ public class HadoopIngestionSpecTest
Assert.assertNotEquals(id1, id2);
}
@Test
public void testContext()
{
final HadoopIngestionSpec schemaWithContext = jsonReadWriteRead(
"{\"context\" : { \"userid\" : 12345, \"cluster\": \"prod\" } }",
HadoopIngestionSpec.class
);
Assert.assertEquals(ImmutableMap.of("userid", 12345, "cluster", "prod"), schemaWithContext.getContext());
final HadoopIngestionSpec schemaWithoutContext = jsonReadWriteRead(
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"foo\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-01-01/P1D\"]\n"
+ " }\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
Assert.assertEquals(ImmutableMap.of(), schemaWithoutContext.getContext());
}
@Test
public void testNoCleanupOnFailure()
{

View File

@ -167,7 +167,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.spec = spec;
this.spec = context == null ? spec : spec.withContext(context);
// Some HadoopIngestionSpec stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(

View File

@ -587,4 +587,52 @@ public class TaskSerdeTest
Assert.assertEquals("blah", task.getClasspathPrefix());
Assert.assertEquals("blah", task2.getClasspathPrefix());
}
@Test
public void testHadoopIndexTaskWithContextSerde() throws Exception
{
final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularities.DAY,
null, ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
null,
jsonMapper
), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null
),
null,
null,
"blah",
jsonMapper,
ImmutableMap.of("userid", 12345, "username", "bob"),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
);
final String json = jsonMapper.writeValueAsString(task);
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(
task.getSpec().getTuningConfig().getJobProperties(),
task2.getSpec().getTuningConfig().getJobProperties()
);
Assert.assertEquals("blah", task.getClasspathPrefix());
Assert.assertEquals("blah", task2.getClasspathPrefix());
Assert.assertEquals(ImmutableMap.of("userid", 12345, "username", "bob"), task2.getContext());
Assert.assertEquals(ImmutableMap.of("userid", 12345, "username", "bob"), task2.getSpec().getContext());
}
}