mirror of https://github.com/apache/druid.git
deleted DerbyMetadataRule/SegmentManagerProvider and replaced them with SQL ones
This commit is contained in:
parent
66277ddf2b
commit
0dda8975e7
|
@ -1,88 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.db;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
import com.google.common.base.Throwables;
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
|
||||||
import org.skife.jdbi.v2.IDBI;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class DerbyMetadataRuleManagerProvider implements MetadataRuleManagerProvider
|
|
||||||
{
|
|
||||||
private final ObjectMapper jsonMapper;
|
|
||||||
private final Supplier<MetadataRuleManagerConfig> config;
|
|
||||||
private final Supplier<MetadataStorageTablesConfig> dbTables;
|
|
||||||
private final MetadataStorageConnector dbConnector;
|
|
||||||
private final Lifecycle lifecycle;
|
|
||||||
private final IDBI dbi;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public DerbyMetadataRuleManagerProvider(
|
|
||||||
ObjectMapper jsonMapper,
|
|
||||||
Supplier<MetadataRuleManagerConfig> config,
|
|
||||||
Supplier<MetadataStorageTablesConfig> dbTables,
|
|
||||||
MetadataStorageConnector dbConnector,
|
|
||||||
IDBI dbi,
|
|
||||||
Lifecycle lifecycle
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.jsonMapper = jsonMapper;
|
|
||||||
this.config = config;
|
|
||||||
this.dbTables = dbTables;
|
|
||||||
this.dbConnector = dbConnector;
|
|
||||||
this.dbi = dbi;
|
|
||||||
this.lifecycle = lifecycle;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DerbyMetadataRuleManager get()
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
lifecycle.addMaybeStartHandler(
|
|
||||||
new Lifecycle.Handler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception
|
|
||||||
{
|
|
||||||
dbConnector.createRulesTable();
|
|
||||||
SQLMetadataRuleManager.createDefaultRule(
|
|
||||||
dbi, dbTables.get().getRulesTable(), config.get().getDefaultRule(), jsonMapper
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop()
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new DerbyMetadataRuleManager(jsonMapper, config, dbTables, dbi);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -61,14 +61,13 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
@ManageLifecycle
|
@ManageLifecycle
|
||||||
public class DerbyMetadataSegmentManager implements MetadataSegmentManager
|
public class DerbyMetadataSegmentManager extends SQLMetadataSegmentManager
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(DerbyMetadataSegmentManager.class);
|
private static final Logger log = new Logger(DerbyMetadataSegmentManager.class);
|
||||||
|
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
|
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final Supplier<MetadataSegmentManagerConfig> config;
|
|
||||||
private final Supplier<MetadataStorageTablesConfig> dbTables;
|
private final Supplier<MetadataStorageTablesConfig> dbTables;
|
||||||
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
|
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
|
||||||
private final IDBI dbi;
|
private final IDBI dbi;
|
||||||
|
@ -85,8 +84,8 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager
|
||||||
IDBI dbi
|
IDBI dbi
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
super(jsonMapper, config, dbTables, dbi);
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.config = config;
|
|
||||||
this.dbTables = dbTables;
|
this.dbTables = dbTables;
|
||||||
this.dataSources = new AtomicReference<ConcurrentHashMap<String, DruidDataSource>>(
|
this.dataSources = new AtomicReference<ConcurrentHashMap<String, DruidDataSource>>(
|
||||||
new ConcurrentHashMap<String, DruidDataSource>()
|
new ConcurrentHashMap<String, DruidDataSource>()
|
||||||
|
@ -94,50 +93,6 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager
|
||||||
this.dbi = dbi;
|
this.dbi = dbi;
|
||||||
}
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
|
||||||
public void start()
|
|
||||||
{
|
|
||||||
synchronized (lock) {
|
|
||||||
if (started) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
|
|
||||||
|
|
||||||
final Duration delay = config.get().getPollDuration().toStandardDuration();
|
|
||||||
ScheduledExecutors.scheduleWithFixedDelay(
|
|
||||||
exec,
|
|
||||||
new Duration(0),
|
|
||||||
delay,
|
|
||||||
new Runnable()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void run()
|
|
||||||
{
|
|
||||||
poll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
started = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@LifecycleStop
|
|
||||||
public void stop()
|
|
||||||
{
|
|
||||||
synchronized (lock) {
|
|
||||||
if (!started) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
started = false;
|
|
||||||
dataSources.set(new ConcurrentHashMap<String, DruidDataSource>());
|
|
||||||
exec.shutdownNow();
|
|
||||||
exec = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean enableDatasource(final String ds)
|
public boolean enableDatasource(final String ds)
|
||||||
{
|
{
|
||||||
|
@ -238,171 +193,6 @@ public class DerbyMetadataSegmentManager implements MetadataSegmentManager
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean enableSegment(final String segmentId)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
dbi.withHandle(
|
|
||||||
new HandleCallback<Void>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Void withHandle(Handle handle) throws Exception
|
|
||||||
{
|
|
||||||
handle.createStatement(
|
|
||||||
String.format("UPDATE %s SET used=true WHERE id = :id", getSegmentsTable())
|
|
||||||
)
|
|
||||||
.bind("id", segmentId)
|
|
||||||
.execute();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
log.error(e, "Exception enabling segment %s", segmentId);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean removeDatasource(final String ds)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
|
|
||||||
|
|
||||||
if (!dataSourceMap.containsKey(ds)) {
|
|
||||||
log.warn("Cannot delete datasource %s, does not exist", ds);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
dbi.withHandle(
|
|
||||||
new HandleCallback<Void>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Void withHandle(Handle handle) throws Exception
|
|
||||||
{
|
|
||||||
handle.createStatement(
|
|
||||||
String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
|
|
||||||
)
|
|
||||||
.bind("dataSource", ds)
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
dataSourceMap.remove(ds);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
log.error(e, "Error removing datasource %s", ds);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean removeSegment(String ds, final String segmentID)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
dbi.withHandle(
|
|
||||||
new HandleCallback<Void>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Void withHandle(Handle handle) throws Exception
|
|
||||||
{
|
|
||||||
handle.createStatement(
|
|
||||||
String.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())
|
|
||||||
).bind("segmentID", segmentID)
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
|
|
||||||
|
|
||||||
if (!dataSourceMap.containsKey(ds)) {
|
|
||||||
log.warn("Cannot find datasource %s", ds);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
DruidDataSource dataSource = dataSourceMap.get(ds);
|
|
||||||
dataSource.removePartition(segmentID);
|
|
||||||
|
|
||||||
if (dataSource.isEmpty()) {
|
|
||||||
dataSourceMap.remove(ds);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
log.error(e, e.toString());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStarted()
|
|
||||||
{
|
|
||||||
return started;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DruidDataSource getInventoryValue(String key)
|
|
||||||
{
|
|
||||||
return dataSources.get().get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<DruidDataSource> getInventory()
|
|
||||||
{
|
|
||||||
return dataSources.get().values();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<String> getAllDatasourceNames()
|
|
||||||
{
|
|
||||||
synchronized (lock) {
|
|
||||||
return dbi.withHandle(
|
|
||||||
new HandleCallback<List<String>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public List<String> withHandle(Handle handle) throws Exception
|
|
||||||
{
|
|
||||||
return handle.createQuery(
|
|
||||||
String.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
|
|
||||||
)
|
|
||||||
.fold(
|
|
||||||
Lists.<String>newArrayList(),
|
|
||||||
new Folder3<ArrayList<String>, Map<String, Object>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public ArrayList<String> fold(
|
|
||||||
ArrayList<String> druidDataSources,
|
|
||||||
Map<String, Object> stringObjectMap,
|
|
||||||
FoldController foldController,
|
|
||||||
StatementContext statementContext
|
|
||||||
) throws SQLException
|
|
||||||
{
|
|
||||||
druidDataSources.add(
|
|
||||||
MapUtils.getString(stringObjectMap, "datasource")
|
|
||||||
);
|
|
||||||
return druidDataSources;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void poll()
|
public void poll()
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
package io.druid.db;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.google.common.base.Supplier;
|
|
||||||
import com.google.inject.Inject;
|
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
|
||||||
import org.skife.jdbi.v2.IDBI;
|
|
||||||
|
|
||||||
public class DerbyMetadataSegmentManagerProvider implements MetadataSegmentManagerProvider
|
|
||||||
{
|
|
||||||
private final ObjectMapper jsonMapper;
|
|
||||||
private final Supplier<MetadataSegmentManagerConfig> config;
|
|
||||||
private final Supplier<MetadataStorageTablesConfig> dbTables;
|
|
||||||
private final MetadataStorageConnector dbConnector;
|
|
||||||
private final IDBI dbi;
|
|
||||||
private final Lifecycle lifecycle;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public DerbyMetadataSegmentManagerProvider(
|
|
||||||
ObjectMapper jsonMapper,
|
|
||||||
Supplier<MetadataSegmentManagerConfig> config,
|
|
||||||
Supplier<MetadataStorageTablesConfig> dbTables,
|
|
||||||
MetadataStorageConnector dbConnector,
|
|
||||||
IDBI dbi,
|
|
||||||
Lifecycle lifecycle
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.jsonMapper = jsonMapper;
|
|
||||||
this.config = config;
|
|
||||||
this.dbTables = dbTables;
|
|
||||||
this.dbConnector = dbConnector;
|
|
||||||
this.dbi = dbi;
|
|
||||||
this.lifecycle = lifecycle;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MetadataSegmentManager get()
|
|
||||||
{
|
|
||||||
lifecycle.addHandler(
|
|
||||||
new Lifecycle.Handler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception
|
|
||||||
{
|
|
||||||
dbConnector.createSegmentTable();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop()
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
return new DerbyMetadataSegmentManager(
|
|
||||||
jsonMapper,
|
|
||||||
config,
|
|
||||||
dbTables,
|
|
||||||
dbi
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -24,9 +24,9 @@ import com.google.inject.Key;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
import io.druid.db.DerbyMetadataRuleManager;
|
import io.druid.db.DerbyMetadataRuleManager;
|
||||||
import io.druid.db.DerbyMetadataRuleManagerProvider;
|
import io.druid.db.SQLMetadataRuleManagerProvider;
|
||||||
import io.druid.db.DerbyMetadataSegmentManager;
|
import io.druid.db.DerbyMetadataSegmentManager;
|
||||||
import io.druid.db.DerbyMetadataSegmentManagerProvider;
|
import io.druid.db.SQLMetadataSegmentManagerProvider;
|
||||||
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
|
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
|
||||||
import io.druid.db.MetadataRuleManager;
|
import io.druid.db.MetadataRuleManager;
|
||||||
import io.druid.db.MetadataSegmentManager;
|
import io.druid.db.MetadataSegmentManager;
|
||||||
|
@ -59,13 +59,13 @@ public class DerbyMetadataStorageDruidModule implements Module
|
||||||
binder, "druid.db.type", Key.get(MetadataSegmentManager.class), Key.get(DerbyMetadataSegmentManager.class)
|
binder, "druid.db.type", Key.get(MetadataSegmentManager.class), Key.get(DerbyMetadataSegmentManager.class)
|
||||||
);
|
);
|
||||||
PolyBind.createChoice(
|
PolyBind.createChoice(
|
||||||
binder, "druid.db.type", Key.get(MetadataSegmentManagerProvider.class), Key.get(DerbyMetadataSegmentManagerProvider.class)
|
binder, "druid.db.type", Key.get(MetadataSegmentManagerProvider.class), Key.get(SQLMetadataSegmentManagerProvider.class)
|
||||||
);
|
);
|
||||||
PolyBind.createChoice(
|
PolyBind.createChoice(
|
||||||
binder, "druid.db.type", Key.get(MetadataRuleManager.class), Key.get(DerbyMetadataRuleManager.class)
|
binder, "druid.db.type", Key.get(MetadataRuleManager.class), Key.get(DerbyMetadataRuleManager.class)
|
||||||
);
|
);
|
||||||
PolyBind.createChoice(
|
PolyBind.createChoice(
|
||||||
binder, "druid.db.type", Key.get(MetadataRuleManagerProvider.class), Key.get(DerbyMetadataRuleManagerProvider.class)
|
binder, "druid.db.type", Key.get(MetadataRuleManagerProvider.class), Key.get(SQLMetadataRuleManagerProvider.class)
|
||||||
);
|
);
|
||||||
PolyBind.createChoice(
|
PolyBind.createChoice(
|
||||||
binder, "druid.db.type", Key.get(SegmentPublisher.class), Key.get(SQLMetadataSegmentPublisher.class)
|
binder, "druid.db.type", Key.get(SegmentPublisher.class), Key.get(SQLMetadataSegmentPublisher.class)
|
||||||
|
@ -99,7 +99,7 @@ public class DerbyMetadataStorageDruidModule implements Module
|
||||||
|
|
||||||
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
|
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
|
||||||
.addBinding("derby")
|
.addBinding("derby")
|
||||||
.to(DerbyMetadataSegmentManagerProvider.class)
|
.to(SQLMetadataSegmentManagerProvider.class)
|
||||||
.in(LazySingleton.class);
|
.in(LazySingleton.class);
|
||||||
|
|
||||||
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
|
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
|
||||||
|
@ -109,7 +109,7 @@ public class DerbyMetadataStorageDruidModule implements Module
|
||||||
|
|
||||||
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
|
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
|
||||||
.addBinding("derby")
|
.addBinding("derby")
|
||||||
.to(DerbyMetadataRuleManagerProvider.class)
|
.to(SQLMetadataRuleManagerProvider.class)
|
||||||
.in(LazySingleton.class);
|
.in(LazySingleton.class);
|
||||||
|
|
||||||
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
|
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
|
||||||
|
|
Loading…
Reference in New Issue