mirror of https://github.com/apache/druid.git
override metadata storage injection in CliHadoopIndexer
This commit is contained in:
parent
70f49538dd
commit
0498df25df
|
@ -23,10 +23,16 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import io.druid.metadata.MetadataStorageConnectorConfig;
|
import io.druid.metadata.MetadataStorageConnectorConfig;
|
||||||
|
|
||||||
|
import javax.validation.constraints.NotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageConnectorConfig>
|
public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageConnectorConfig>
|
||||||
{
|
{
|
||||||
|
@JsonProperty("type")
|
||||||
|
@NotNull
|
||||||
|
public String type;
|
||||||
|
|
||||||
@JsonProperty("connectURI")
|
@JsonProperty("connectURI")
|
||||||
public String connectURI;
|
public String connectURI;
|
||||||
|
|
||||||
|
@ -44,6 +50,11 @@ public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageCo
|
||||||
return segmentTable;
|
return segmentTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetadataStorageConnectorConfig get()
|
public MetadataStorageConnectorConfig get()
|
||||||
{
|
{
|
||||||
|
|
|
@ -33,7 +33,6 @@ public class SQLMetadataSegmentManagerProvider implements MetadataSegmentManager
|
||||||
private final Supplier<MetadataSegmentManagerConfig> config;
|
private final Supplier<MetadataSegmentManagerConfig> config;
|
||||||
private final Supplier<MetadataStorageTablesConfig> storageConfig;
|
private final Supplier<MetadataStorageTablesConfig> storageConfig;
|
||||||
private final SQLMetadataConnector connector;
|
private final SQLMetadataConnector connector;
|
||||||
private final IDBI dbi;
|
|
||||||
private final Lifecycle lifecycle;
|
private final Lifecycle lifecycle;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -49,7 +48,6 @@ public class SQLMetadataSegmentManagerProvider implements MetadataSegmentManager
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.storageConfig = storageConfig;
|
this.storageConfig = storageConfig;
|
||||||
this.connector = connector;
|
this.connector = connector;
|
||||||
this.dbi = this.connector.getDBI();
|
|
||||||
this.lifecycle = lifecycle;
|
this.lifecycle = lifecycle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,13 +46,7 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
MetadataStorageConnectorConfig config = jsonMapper.readValue(
|
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();
|
||||||
"{"
|
|
||||||
+ "\"type\" : \"db\",\n"
|
|
||||||
+ "\"segmentTable\" : \"segments\"\n"
|
|
||||||
+ "}",
|
|
||||||
MetadataStorageConnectorConfig.class
|
|
||||||
);
|
|
||||||
|
|
||||||
connector = new TestDerbyConnector(
|
connector = new TestDerbyConnector(
|
||||||
Suppliers.ofInstance(config),
|
Suppliers.ofInstance(config),
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.cli;
|
package io.druid.cli;
|
||||||
|
|
||||||
|
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||||
import com.google.api.client.util.Lists;
|
import com.google.api.client.util.Lists;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
@ -35,9 +36,12 @@ import io.druid.indexer.HadoopDruidIndexerJob;
|
||||||
import io.druid.indexer.JobHelper;
|
import io.druid.indexer.JobHelper;
|
||||||
import io.druid.indexer.Jobby;
|
import io.druid.indexer.Jobby;
|
||||||
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
|
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
|
||||||
|
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
|
||||||
|
import io.druid.metadata.MetadataStorageConnectorConfig;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -48,9 +52,12 @@ import java.util.List;
|
||||||
public class CliInternalHadoopIndexer extends GuiceRunnable
|
public class CliInternalHadoopIndexer extends GuiceRunnable
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(CliHadoopIndexer.class);
|
private static final Logger log = new Logger(CliHadoopIndexer.class);
|
||||||
|
|
||||||
@Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true)
|
@Arguments(description = "A JSON object or the path to a file that contains a JSON object", required = true)
|
||||||
private String argumentSpec;
|
private String argumentSpec;
|
||||||
|
|
||||||
|
private HadoopDruidIndexerConfig config;
|
||||||
|
|
||||||
public CliInternalHadoopIndexer()
|
public CliInternalHadoopIndexer()
|
||||||
{
|
{
|
||||||
super(log);
|
super(log);
|
||||||
|
@ -65,8 +72,14 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
|
||||||
@Override
|
@Override
|
||||||
public void configure(Binder binder)
|
public void configure(Binder binder)
|
||||||
{
|
{
|
||||||
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/hadoop-indexer");
|
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/internal-hadoop-indexer");
|
||||||
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
|
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
|
||||||
|
|
||||||
|
// bind metadata storage config based on HadoopIOConfig
|
||||||
|
MetadataStorageUpdaterJobSpec metadataSpec = getHadoopDruidIndexerConfig().getSchema()
|
||||||
|
.getIOConfig()
|
||||||
|
.getMetadataUpdateSpec();
|
||||||
|
binder.bind(MetadataStorageConnectorConfig.class).toInstance(metadataSpec.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -75,10 +88,14 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
Injector injector = makeInjector();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
HadoopDruidIndexerConfig config = getHadoopDruidIndexerConfig();
|
Injector injector = makeInjector();
|
||||||
|
|
||||||
|
MetadataStorageUpdaterJobSpec metadataSpec = getHadoopDruidIndexerConfig().getSchema().getIOConfig().getMetadataUpdateSpec();
|
||||||
|
// override metadata storage type based on HadoopIOConfig
|
||||||
|
Preconditions.checkNotNull(metadataSpec.getType(), "type in metadataUpdateSpec must not be null");
|
||||||
|
injector.getInstance(Properties.class).setProperty("druid.metadata.storage.type", metadataSpec.getType());
|
||||||
|
|
||||||
List<Jobby> jobs = Lists.newArrayList();
|
List<Jobby> jobs = Lists.newArrayList();
|
||||||
jobs.add(new HadoopDruidDetermineConfigurationJob(config));
|
jobs.add(new HadoopDruidDetermineConfigurationJob(config));
|
||||||
jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));
|
jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));
|
||||||
|
@ -92,15 +109,18 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
|
||||||
|
|
||||||
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
|
public HadoopDruidIndexerConfig getHadoopDruidIndexerConfig()
|
||||||
{
|
{
|
||||||
try {
|
if(config == null) {
|
||||||
if (argumentSpec.startsWith("{")) {
|
try {
|
||||||
return HadoopDruidIndexerConfig.fromString(argumentSpec);
|
if (argumentSpec.startsWith("{")) {
|
||||||
} else {
|
config = HadoopDruidIndexerConfig.fromString(argumentSpec);
|
||||||
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
|
} else {
|
||||||
|
config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
return config;
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue