fix realtime module + cleaner metadata abstraction

- better metadata abstractions
- more db->metadata renaming
This commit is contained in:
Xavier Léauté 2014-11-03 16:36:53 -08:00
parent 48e6bef250
commit f56d026d7d
13 changed files with 49 additions and 23 deletions

View File

@ -10,7 +10,7 @@ In addition to the configuration of some of the default modules in [Configuratio
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local|
|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local|
|`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. Storing incoming tasks in metadata storage allows for tasks to be resumed if the overlord should fail.|local|
|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|

View File

@ -15,7 +15,7 @@ Depending on what `druid.storage.type` is set to, Druid will upload segments to
Make sure that the `druid.publish.type` on your real-time nodes is set to "db". Also make sure that `druid.storage.type` is set to a deep storage that makes sense. Some example configs:
```
druid.publish.type=db
druid.publish.type=metadata
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid

View File

@ -91,7 +91,7 @@ druid.indexer.runner.compressZnodes=true
druid.indexer.runner.minWorkerVersion=#{WORKER_VERSION}
# Store all task state in MySQL
druid.indexer.storage.type=db
druid.indexer.storage.type=metadata
druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
@ -387,4 +387,4 @@ druid.emitter.http.recipientBaseUrl=#{EMITTER_URL}
# If you choose to compress ZK announcements, you must do so for every node type
druid.announcer.type=batch
druid.curator.compress=true
```
```

View File

@ -35,8 +35,8 @@ druid.zk.service.host=localhost
# The realtime config file.
druid.realtime.specFile=/path/to/specFile
# Choices: db (hand off segments), noop (do not hand off segments).
druid.publish.type=db
# Choices: metadata (hand off segments), noop (do not hand off segments).
druid.publish.type=metadata
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
@ -90,7 +90,7 @@ druid.metadata.storage.connector.password=#{MYSQL_PW}
druid.metadata.storage.connector.useValidationQuery=true
druid.metadata.storage.tables.base=prod
druid.publish.type=db
druid.publish.type=metadata
druid.processing.numThreads=3

View File

@ -2,7 +2,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
# Change this config to db to hand off to the rest of the Druid cluster
# Change this config to metadata to hand off to the rest of the Druid cluster
druid.publish.type=noop
druid.processing.buffer.sizeBytes=100000000

View File

@ -31,6 +31,7 @@ import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManagerProvider;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.metadata.MetadataSegmentPublisher;
import io.druid.metadata.MetadataSegmentPublisherProvider;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
@ -96,7 +97,7 @@ public class SQLMetadataStorageDruidModule implements Module
PolyBind.createChoiceWithDefault(
binder,
PROPERTY,
Key.get(SegmentPublisher.class),
Key.get(MetadataSegmentPublisher.class),
Key.get(SQLMetadataSegmentPublisher.class),
defaultPropertyValue
);
@ -153,7 +154,7 @@ public class SQLMetadataStorageDruidModule implements Module
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisher.class))
.addBinding(type)
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
import io.druid.segment.realtime.SegmentPublisher;
public interface MetadataSegmentPublisher extends SegmentPublisher
{
}

View File

@ -19,11 +19,7 @@
package io.druid.metadata;
import io.druid.segment.realtime.SegmentPublisher;
/**
*/
public interface MetadataSegmentPublisherProvider extends SegmentPublisherProvider
{
public SegmentPublisher get();
public MetadataSegmentPublisher get();
}

View File

@ -37,7 +37,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
public class SQLMetadataSegmentPublisher implements SegmentPublisher
public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
{
private static final Logger log = new Logger(SQLMetadataSegmentPublisher.class);

View File

@ -43,7 +43,7 @@ public class SQLMetadataSegmentPublisherProvider implements MetadataSegmentPubli
private ObjectMapper jsonMapper = null;
@Override
public SegmentPublisher get()
public MetadataSegmentPublisher get()
{
return new SQLMetadataSegmentPublisher(jsonMapper, config, connector);
}

View File

@ -29,7 +29,7 @@ import io.druid.segment.realtime.SegmentPublisher;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = NoopSegmentPublisherProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "db", value = MetadataSegmentPublisherProvider.class)
@JsonSubTypes.Type(name = "metadata", value = MetadataSegmentPublisherProvider.class)
})
public interface SegmentPublisherProvider extends Provider<SegmentPublisher>
{

View File

@ -175,7 +175,7 @@ public class CliOverlord extends ServerRunnable
storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class);
binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class);
storageBinder.addBinding("db").to(MetadataTaskStorage.class).in(ManageLifecycle.class);
storageBinder.addBinding("metadata").to(MetadataTaskStorage.class).in(ManageLifecycle.class);
binder.bind(MetadataTaskStorage.class).in(LazySingleton.class);
}

View File

@ -25,6 +25,8 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.metadata.MetadataSegmentPublisher;
import io.druid.metadata.SQLMetadataSegmentPublisher;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.FireDepartment;
@ -47,18 +49,19 @@ public class RealtimeModule implements Module
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(
PolyBind.createChoiceWithDefault(
binder,
"druid.publish.type",
Key.get(SegmentPublisher.class),
Key.get(SegmentPublisher.class)
null,
"metadata"
);
final MapBinder<String, SegmentPublisher> publisherBinder = PolyBind.optionBinder(
binder,
Key.get(SegmentPublisher.class)
);
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class);
binder.bind(SegmentPublisher.class).in(LazySingleton.class);
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class).in(LazySingleton.class);
publisherBinder.addBinding("metadata").to(MetadataSegmentPublisher.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,