Realtime: Remove MetadataUpdater

This commit is contained in:
Gian Merlino 2013-03-14 12:35:38 -07:00
parent e5d5050c3f
commit 9fe6a37f86
13 changed files with 139 additions and 151 deletions

View File

@ -8,8 +8,9 @@ import com.metamx.druid.client.ZKPhoneBook;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.phonebook.PhoneBook;
import java.io.File;
@ -41,10 +42,11 @@ public class RealtimeStandaloneMain
};
rn.setPhoneBook(dummyPhoneBook);
MetadataUpdater dummyMetadataUpdater =
new MetadataUpdater(null, null) {
SegmentAnnouncer dummySegmentAnnouncer =
new SegmentAnnouncer()
{
@Override
public void publishSegment(DataSegment segment) throws IOException
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@ -54,17 +56,20 @@ public class RealtimeStandaloneMain
{
// do nothing
}
};
SegmentPublisher dummySegmentPublisher =
new SegmentPublisher()
{
@Override
public void announceSegment(DataSegment segment) throws IOException
public void publishSegment(DataSegment segment) throws IOException
{
// do nothing
}
};
// dummyMetadataUpdater will not send updates to db because standalone demo has no db
rn.setMetadataUpdater(dummyMetadataUpdater);
// dummySegmentPublisher will not send updates to db because standalone demo has no db
rn.setSegmentAnnouncer(dummySegmentAnnouncer);
rn.setSegmentPublisher(dummySegmentPublisher);
rn.setDataSegmentPusher(
new DataSegmentPusher()
{

View File

@ -8,8 +8,9 @@ import com.metamx.druid.client.ZKPhoneBook;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.RealtimeNode;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.phonebook.PhoneBook;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
@ -43,30 +44,34 @@ public class RealtimeStandaloneMain
};
rn.setPhoneBook(dummyPhoneBook);
final MetadataUpdater dummyMetadataUpdater =
new MetadataUpdater(null, null) {
final SegmentAnnouncer dummySegmentAnnouncer =
new SegmentAnnouncer()
{
@Override
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
// do nothing
}
};
SegmentPublisher dummySegmentPublisher =
new SegmentPublisher()
{
@Override
public void publishSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
// do nothing
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
// do nothing
}
};
// dummyMetadataUpdater will not send updates to db because standalone demo has no db
rn.setMetadataUpdater(dummyMetadataUpdater);
// dummySegmentPublisher will not send updates to db because standalone demo has no db
rn.setSegmentAnnouncer(dummySegmentAnnouncer);
rn.setSegmentPublisher(dummySegmentPublisher);
rn.setDataSegmentPusher(
new DataSegmentPusher()
{

View File

@ -22,7 +22,6 @@ import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.MetadataUpdater;
import com.metamx.druid.realtime.Plumber;
import com.metamx.druid.realtime.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
@ -186,7 +185,8 @@ public class RealtimeIndexTask extends AbstractTask
realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher());
realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate());
realtimePlumberSchool.setVersioningPolicy(versioningPolicy);
realtimePlumberSchool.setMetadataUpdater(new MetadataUpdater(lockingSegmentAnnouncer, segmentPublisher));
realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter());

View File

@ -62,10 +62,12 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.DbTaskStorage;
@ -73,7 +75,6 @@ import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.coordinator.TaskLockbox;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskQueue;
@ -85,7 +86,6 @@ import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
@ -95,9 +95,9 @@ import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFa
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -529,7 +529,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
{
if (taskToolboxFactory == null) {
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
configFactory.build(MetadataUpdaterConfig.class),
configFactory.build(ZkSegmentAnnouncerConfig.class),
getPhoneBook()
);
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);

View File

@ -59,9 +59,9 @@ import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
import com.metamx.druid.merger.worker.WorkerTaskMonitor;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.realtime.MetadataUpdaterConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -378,7 +378,7 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
if (taskToolboxFactory == null) {
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
configFactory.build(MetadataUpdaterConfig.class),
configFactory.build(ZkSegmentAnnouncerConfig.class),
getPhoneBook()
);
lifecycle.addManagedInstance(segmentAnnouncer);

View File

@ -17,12 +17,12 @@ public class DbSegmentPublisher implements SegmentPublisher
private static final Logger log = new Logger(DbSegmentPublisher.class);
private final ObjectMapper jsonMapper;
private final MetadataUpdaterConfig config;
private final DbSegmentPublisherConfig config;
private final DBI dbi;
public DbSegmentPublisher(
ObjectMapper jsonMapper,
MetadataUpdaterConfig config,
DbSegmentPublisherConfig config,
DBI dbi
)
{

View File

@ -0,0 +1,9 @@
package com.metamx.druid.realtime;
import org.skife.config.Config;
public abstract class DbSegmentPublisherConfig
{
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();
}

View File

@ -1,35 +0,0 @@
package com.metamx.druid.realtime;
import com.metamx.druid.client.DataSegment;
import java.io.IOException;
public class MetadataUpdater implements SegmentAnnouncer, SegmentPublisher
{
private final SegmentAnnouncer segmentAnnouncer;
private final SegmentPublisher segmentPublisher;
public MetadataUpdater(SegmentAnnouncer segmentAnnouncer, SegmentPublisher segmentPublisher)
{
this.segmentAnnouncer = segmentAnnouncer;
this.segmentPublisher = segmentPublisher;
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
segmentAnnouncer.announceSegment(segment);
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
segmentAnnouncer.unannounceSegment(segment);
}
@Override
public void publishSegment(DataSegment segment) throws IOException
{
segmentPublisher.publishSegment(segment);
}
}

View File

@ -1,47 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class MetadataUpdaterConfig
{
@Config("druid.host")
public abstract String getServerName();
@Config("druid.host")
public abstract String getHost();
@Config("druid.server.maxSize")
@Default("0")
public abstract long getMaxSize();
@Config("druid.database.segmentTable")
public abstract String getSegmentTable();
@Config("druid.zk.paths.announcementsPath")
public abstract String getAnnounceLocation();
@Config("druid.zk.paths.servedSegmentsPath")
public abstract String getServedSegmentsLocation();
}

View File

@ -77,7 +77,8 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();
private MetadataUpdater metadataUpdater = null;
private SegmentAnnouncer segmentAnnouncer = null;
private SegmentPublisher segmentPublisher = null;
private DataSegmentPusher dataSegmentPusher = null;
private List<FireDepartment> fireDepartments = null;
private ServerView view = null;
@ -102,10 +103,17 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
return this;
}
public RealtimeNode setMetadataUpdater(MetadataUpdater metadataUpdater)
public RealtimeNode setSegmentAnnouncer(SegmentAnnouncer segmentAnnouncer)
{
Preconditions.checkState(this.metadataUpdater == null, "Cannot set metadataUpdater once it has already been set.");
this.metadataUpdater = metadataUpdater;
Preconditions.checkState(this.segmentAnnouncer == null, "Cannot set segmentAnnouncer once it has already been set.");
this.segmentAnnouncer = segmentAnnouncer;
return this;
}
public RealtimeNode setSegmentPublisher(SegmentPublisher segmentPublisher)
{
Preconditions.checkState(this.segmentPublisher == null, "Cannot set segmentPublisher once it has already been set.");
this.segmentPublisher = segmentPublisher;
return this;
}
@ -130,10 +138,16 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
return this;
}
public MetadataUpdater getMetadataUpdater()
public SegmentAnnouncer getSegmentAnnouncer()
{
initializeMetadataUpdater();
return metadataUpdater;
initializeSegmentAnnouncer();
return segmentAnnouncer;
}
public SegmentPublisher getSegmentPublisher()
{
initializeSegmentPublisher();
return segmentPublisher;
}
public DataSegmentPusher getDataSegmentPusher()
@ -157,7 +171,8 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
protected void doInit() throws Exception
{
initializeView();
initializeMetadataUpdater();
initializeSegmentAnnouncer();
initializeSegmentPublisher();
initializeSegmentPusher();
initializeJacksonInjectables();
@ -213,7 +228,8 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
injectables.put("segmentPusher", dataSegmentPusher);
injectables.put("metadataUpdater", metadataUpdater);
injectables.put("segmentAnnouncer", segmentAnnouncer);
injectables.put("segmentPublisher", segmentPublisher);
injectables.put("serverView", view);
injectables.put("serviceEmitter", getEmitter());
@ -253,21 +269,25 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
}
}
protected void initializeMetadataUpdater()
protected void initializeSegmentAnnouncer()
{
if (metadataUpdater == null) {
final MetadataUpdaterConfig metadataUpdaterConfig = getConfigFactory().build(MetadataUpdaterConfig.class);
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(metadataUpdaterConfig, getPhoneBook());
final SegmentPublisher segmentPublisher = new DbSegmentPublisher(
if (segmentAnnouncer == null) {
final ZkSegmentAnnouncerConfig zkSegmentAnnouncerConfig = getConfigFactory().build(ZkSegmentAnnouncerConfig.class);
segmentAnnouncer = new ZkSegmentAnnouncer(zkSegmentAnnouncerConfig, getPhoneBook());
getLifecycle().addManagedInstance(segmentAnnouncer);
}
}
protected void initializeSegmentPublisher()
{
if (segmentPublisher == null) {
final DbSegmentPublisherConfig dbSegmentPublisherConfig = getConfigFactory().build(DbSegmentPublisherConfig.class);
segmentPublisher = new DbSegmentPublisher(
getJsonMapper(),
metadataUpdaterConfig,
dbSegmentPublisherConfig,
new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI()
);
getLifecycle().addManagedInstance(segmentAnnouncer);
getLifecycle().addManagedInstance(segmentPublisher);
metadataUpdater = new MetadataUpdater(segmentAnnouncer, segmentPublisher);
}
}

View File

@ -90,7 +90,8 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
private volatile DataSegmentPusher dataSegmentPusher = null;
private volatile MetadataUpdater metadataUpdater = null;
private volatile SegmentAnnouncer segmentAnnouncer = null;
private volatile SegmentPublisher segmentPublisher = null;
private volatile ServerView serverView = null;
private ServiceEmitter emitter;
@ -136,10 +137,16 @@ public class RealtimePlumberSchool implements PlumberSchool
this.dataSegmentPusher = dataSegmentPusher;
}
@JacksonInject("metadataUpdater")
public void setMetadataUpdater(MetadataUpdater metadataUpdater)
@JacksonInject("segmentAnnouncer")
public void setSegmentAnnouncer(SegmentAnnouncer segmentAnnouncer)
{
this.metadataUpdater = metadataUpdater;
this.segmentAnnouncer = segmentAnnouncer;
}
@JacksonInject("segmentPublisher")
public void setSegmentPublisher(SegmentPublisher segmentPublisher)
{
this.segmentPublisher = segmentPublisher;
}
@JacksonInject("serverView")
@ -200,7 +207,7 @@ public class RealtimePlumberSchool implements PlumberSchool
retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval));
try {
metadataUpdater.announceSegment(retVal.getSegment());
segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal);
}
catch (IOException e) {
@ -297,7 +304,7 @@ public class RealtimePlumberSchool implements PlumberSchool
for (final Sink sink : sinks.values()) {
try {
metadataUpdater.unannounceSegment(sink.getSegment());
segmentAnnouncer.unannounceSegment(sink.getSegment());
}
catch (Exception e) {
log.makeAlert("Failed to unannounce segment on shutdown")
@ -375,7 +382,7 @@ public class RealtimePlumberSchool implements PlumberSchool
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
metadataUpdater.announceSegment(currSink.getSegment());
segmentAnnouncer.announceSegment(currSink.getSegment());
}
catch (IOException e) {
log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource())
@ -415,7 +422,7 @@ public class RealtimePlumberSchool implements PlumberSchool
if (segment.getVersion().compareTo(sink.getSegment().getVersion()) >= 0) {
try {
metadataUpdater.unannounceSegment(sink.getSegment());
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
sinks.remove(sinkKey);
}
@ -527,7 +534,7 @@ public class RealtimePlumberSchool implements PlumberSchool
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
metadataUpdater.publishSegment(segment);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
@ -711,7 +718,8 @@ public class RealtimePlumberSchool implements PlumberSchool
{
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action.");
Preconditions.checkNotNull(metadataUpdater, "must specify a metadataUpdater to do this action.");
Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action.");
Preconditions.checkNotNull(segmentPublisher, "must specify a segmentPublisher to do this action.");
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
}

View File

@ -18,14 +18,14 @@ public class ZkSegmentAnnouncer implements SegmentAnnouncer
private final Object lock = new Object();
private final MetadataUpdaterConfig config;
private final ZkSegmentAnnouncerConfig config;
private final PhoneBook yp;
private final String servedSegmentsLocation;
private volatile boolean started = false;
public ZkSegmentAnnouncer(
MetadataUpdaterConfig config,
ZkSegmentAnnouncerConfig config,
PhoneBook yp
)
{
@ -83,7 +83,7 @@ public class ZkSegmentAnnouncer implements SegmentAnnouncer
return;
}
log.info("Stopping MetadataUpdater with config[%s]", config);
log.info("Stopping ZkSegmentAnnouncer with config[%s]", config);
yp.unannounce(config.getAnnounceLocation(), config.getServerName());
started = false;

View File

@ -0,0 +1,23 @@
package com.metamx.druid.realtime;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class ZkSegmentAnnouncerConfig
{
@Config("druid.host")
public abstract String getServerName();
@Config("druid.host")
public abstract String getHost();
@Config("druid.server.maxSize")
@Default("0")
public abstract long getMaxSize();
@Config("druid.zk.paths.announcementsPath")
public abstract String getAnnounceLocation();
@Config("druid.zk.paths.servedSegmentsPath")
public abstract String getServedSegmentsLocation();
}