mirror of https://github.com/apache/druid.git
make the injection gods happy
This commit is contained in:
parent
377151beda
commit
fb4d41cedb
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
public interface MetadataStorageActionHandlerFactory
|
||||
{
|
||||
public <A,B,C,D> MetadataStorageActionHandler<A,B,C,D> create(MetadataStorageActionHandlerTypes<A,B,C,D> types);
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
|
||||
public interface MetadataStorageActionHandlerTypes<A,B,C,D>
|
||||
{
|
||||
public TypeReference<A> getTaskType();
|
||||
public TypeReference<B> getTaskStatusType();
|
||||
public TypeReference<C> getTaskActionType();
|
||||
public TypeReference<D> getTaskLockType();
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -45,6 +46,40 @@ import java.util.Map;
|
|||
|
||||
public class MetadataTaskStorage implements TaskStorage
|
||||
{
|
||||
private static final MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock> TASK_TYPES = new MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock>()
|
||||
{
|
||||
@Override
|
||||
public TypeReference<Task> getTaskType()
|
||||
{
|
||||
return new TypeReference<Task>()
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeReference<TaskStatus> getTaskStatusType()
|
||||
{
|
||||
return new TypeReference<TaskStatus>()
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeReference<TaskAction> getTaskActionType()
|
||||
{
|
||||
return new TypeReference<TaskAction>()
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeReference<TaskLock> getTaskLockType()
|
||||
{
|
||||
return new TypeReference<TaskLock>()
|
||||
{
|
||||
};
|
||||
}
|
||||
};
|
||||
private final MetadataStorageConnector metadataStorageConnector;
|
||||
private final TaskStorageConfig config;
|
||||
private final MetadataStorageActionHandler<Task, TaskStatus, TaskAction, TaskLock> handler;
|
||||
|
@ -55,13 +90,12 @@ public class MetadataTaskStorage implements TaskStorage
|
|||
public MetadataTaskStorage(
|
||||
final MetadataStorageConnector metadataStorageConnector,
|
||||
final TaskStorageConfig config,
|
||||
final MetadataStorageActionHandler handler
|
||||
final MetadataStorageActionHandlerFactory factory
|
||||
)
|
||||
{
|
||||
this.metadataStorageConnector = metadataStorageConnector;
|
||||
this.config = config;
|
||||
// this is a little janky but haven't figured out how to get Guice to do this yet.
|
||||
this.handler = (MetadataStorageActionHandler<Task, TaskStatus, TaskAction, TaskLock>) handler;
|
||||
this.handler = factory.create(TASK_TYPES);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
|
|
@ -40,6 +40,7 @@ import io.druid.db.SQLMetadataSegmentManagerProvider;
|
|||
import io.druid.db.SQLMetadataSegmentPublisher;
|
||||
import io.druid.db.SQLMetadataSegmentPublisherProvider;
|
||||
import io.druid.db.SQLMetadataStorageActionHandler;
|
||||
import io.druid.db.SQLMetadataStorageActionHandlerFactory;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.PolyBind;
|
||||
|
@ -47,6 +48,7 @@ import io.druid.indexer.MetadataStorageUpdaterJobHandler;
|
|||
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
|
||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandler;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerFactory;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
@ -110,9 +112,9 @@ public class MySQLMetadataStorageModule implements DruidModule
|
|||
.to(SQLMetadataSegmentPublisherProvider.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
|
||||
.addBinding("mysql")
|
||||
.to(SQLMetadataStorageActionHandler.class)
|
||||
.to(SQLMetadataStorageActionHandlerFactory.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
|
||||
|
|
|
@ -40,6 +40,7 @@ import io.druid.db.SQLMetadataSegmentManagerProvider;
|
|||
import io.druid.db.SQLMetadataSegmentPublisher;
|
||||
import io.druid.db.SQLMetadataSegmentPublisherProvider;
|
||||
import io.druid.db.SQLMetadataStorageActionHandler;
|
||||
import io.druid.db.SQLMetadataStorageActionHandlerFactory;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.PolyBind;
|
||||
|
@ -47,6 +48,7 @@ import io.druid.indexer.MetadataStorageUpdaterJobHandler;
|
|||
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
|
||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandler;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerFactory;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
@ -105,9 +107,9 @@ public class PostgresMetadataStorageModule implements DruidModule
|
|||
.to(SQLMetadataSegmentPublisher.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
|
||||
.addBinding("postgresql")
|
||||
.to(SQLMetadataStorageActionHandler.class)
|
||||
.to(SQLMetadataStorageActionHandlerFactory.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
|
||||
|
@ -132,4 +134,4 @@ public class PostgresMetadataStorageModule implements DruidModule
|
|||
{
|
||||
return dbConnector.getDBI();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,11 +28,12 @@ import com.google.common.base.Predicate;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.name.Named;
|
||||
import com.google.inject.Provider;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.RetryUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandler;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerTypes;
|
||||
import io.druid.indexing.overlord.TaskExistsException;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
|
@ -71,28 +72,22 @@ public class SQLMetadataStorageActionHandler<TaskType, TaskStatusType, TaskActio
|
|||
private final TypeReference taskActionType;
|
||||
private final TypeReference taskLockType;
|
||||
|
||||
|
||||
@Inject
|
||||
public SQLMetadataStorageActionHandler(
|
||||
final IDBI dbi,
|
||||
final SQLMetadataConnector connector,
|
||||
final MetadataStorageTablesConfig config,
|
||||
final ObjectMapper jsonMapper,
|
||||
// we all love type erasure
|
||||
final @Named("taskType") TypeReference taskType,
|
||||
final @Named("taskStatusType") TypeReference taskStatusType,
|
||||
final @Named("taskActionType") TypeReference taskActionType,
|
||||
final @Named("taskLockType") TypeReference taskLockType
|
||||
final MetadataStorageActionHandlerTypes<TaskType, TaskStatusType, TaskActionType, TaskLockType> types
|
||||
)
|
||||
{
|
||||
this.dbi = dbi;
|
||||
this.connector = connector;
|
||||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.taskType = taskType;
|
||||
this.taskStatusType = taskStatusType;
|
||||
this.taskActionType = taskActionType;
|
||||
this.taskLockType = taskLockType;
|
||||
this.taskType = types.getTaskType();
|
||||
this.taskStatusType = types.getTaskStatusType();
|
||||
this.taskActionType = types.getTaskActionType();
|
||||
this.taskLockType = types.getTaskLockType();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.db;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandler;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerFactory;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerTypes;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
||||
public class SQLMetadataStorageActionHandlerFactory implements MetadataStorageActionHandlerFactory
|
||||
{
|
||||
private final IDBI dbi;
|
||||
private final SQLMetadataConnector connector;
|
||||
private final MetadataStorageTablesConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
public SQLMetadataStorageActionHandlerFactory(
|
||||
IDBI dbi,
|
||||
SQLMetadataConnector connector,
|
||||
MetadataStorageTablesConfig config,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.dbi = dbi;
|
||||
this.connector = connector;
|
||||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
public <A,B,C,D> MetadataStorageActionHandler<A,B,C,D> create(MetadataStorageActionHandlerTypes<A,B,C,D> types)
|
||||
{
|
||||
return new SQLMetadataStorageActionHandler<>(dbi, connector, config, jsonMapper, types);
|
||||
}
|
||||
}
|
|
@ -39,10 +39,12 @@ import io.druid.db.DerbyConnector;
|
|||
import io.druid.db.SQLMetadataStorageActionHandler;
|
||||
import io.druid.db.SQLMetadataSegmentPublisher;
|
||||
import io.druid.db.SQLMetadataSegmentPublisherProvider;
|
||||
import io.druid.db.SQLMetadataStorageActionHandlerFactory;
|
||||
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
|
||||
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
|
||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandler;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerFactory;
|
||||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
||||
|
@ -136,9 +138,9 @@ public class DerbyMetadataStorageDruidModule implements Module
|
|||
.to(IndexerSQLMetadataStorageCoordinator.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
|
||||
.addBinding("derby")
|
||||
.to(SQLMetadataStorageActionHandler.class)
|
||||
.to(SQLMetadataStorageActionHandlerFactory.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.inject.Binder;
|
|||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.google.inject.multibindings.MapBinder;
|
||||
import com.google.inject.name.Names;
|
||||
|
@ -58,6 +59,7 @@ import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
|
|||
import io.druid.indexing.overlord.ForkingTaskRunnerFactory;
|
||||
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
|
||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||
import io.druid.indexing.overlord.MetadataStorageActionHandlerTypes;
|
||||
import io.druid.indexing.overlord.MetadataTaskStorage;
|
||||
import io.druid.indexing.overlord.RemoteTaskRunnerFactory;
|
||||
import io.druid.indexing.overlord.TaskLockbox;
|
||||
|
@ -185,24 +187,6 @@ public class CliOverlord extends ServerRunnable
|
|||
|
||||
storageBinder.addBinding("db").to(MetadataTaskStorage.class).in(ManageLifecycle.class);
|
||||
binder.bind(MetadataTaskStorage.class).in(LazySingleton.class);
|
||||
|
||||
// gotta love type erasure
|
||||
binder.bind(TypeReference.class).annotatedWith(Names.named("taskType")).toInstance(
|
||||
new TypeReference<Task>()
|
||||
{
|
||||
}
|
||||
);
|
||||
binder.bind(TypeReference.class).annotatedWith(Names.named("taskStatusType")).toInstance(new TypeReference<TaskStatus>(){});
|
||||
binder.bind(TypeReference.class).annotatedWith(Names.named("taskActionType")).toInstance(
|
||||
new TypeReference<TaskAction>()
|
||||
{
|
||||
}
|
||||
);
|
||||
binder.bind(TypeReference.class).annotatedWith(Names.named("taskLockType")).toInstance(
|
||||
new TypeReference<TaskLock>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void configureRunners(Binder binder)
|
||||
|
|
Loading…
Reference in New Issue