mirror of https://github.com/apache/druid.git
Realtime:
- MetadataUpdater now built from SegmentAnnouncer, SegmentPublisher instances. - Sinks can take a version instead of always using interval.start. The realtime plumber selects a version using a VersioningPolicy. - Plumbers gained a startJob method. - Realtime plumbers gained an implementation for finishJob.
This commit is contained in:
parent
4126893ba3
commit
3fa46988f5
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.guava;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ThreadRenamingCallable<T> implements Callable<T>
|
||||
{
|
||||
private final String name;
|
||||
|
||||
public ThreadRenamingCallable(
|
||||
String name
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final T call()
|
||||
{
|
||||
final Thread currThread = Thread.currentThread();
|
||||
String currName = currThread.getName();
|
||||
try {
|
||||
currThread.setName(name);
|
||||
return doCall();
|
||||
}
|
||||
finally {
|
||||
currThread.setName(currName);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract T doCall();
|
||||
}
|
|
@ -1,21 +1,17 @@
|
|||
package druid.examples;
|
||||
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.ZKPhoneBook;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.realtime.MetadataUpdater;
|
||||
import com.metamx.druid.realtime.MetadataUpdaterConfig;
|
||||
import com.metamx.druid.realtime.RealtimeNode;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -46,10 +42,7 @@ public class RealtimeStandaloneMain
|
|||
|
||||
rn.setPhoneBook(dummyPhoneBook);
|
||||
MetadataUpdater dummyMetadataUpdater =
|
||||
new MetadataUpdater(new DefaultObjectMapper(),
|
||||
Config.createFactory(Initialization.loadProperties()).build(MetadataUpdaterConfig.class),
|
||||
dummyPhoneBook,
|
||||
null) {
|
||||
new MetadataUpdater(null, null) {
|
||||
@Override
|
||||
public void publishSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
|
|
|
@ -1,22 +1,18 @@
|
|||
package druid.examples;
|
||||
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.ZKPhoneBook;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.realtime.MetadataUpdater;
|
||||
import com.metamx.druid.realtime.MetadataUpdaterConfig;
|
||||
import com.metamx.druid.realtime.RealtimeNode;
|
||||
import com.metamx.druid.loading.DataSegmentPusher;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -47,13 +43,8 @@ public class RealtimeStandaloneMain
|
|||
};
|
||||
|
||||
rn.setPhoneBook(dummyPhoneBook);
|
||||
MetadataUpdater dummyMetadataUpdater =
|
||||
new MetadataUpdater(
|
||||
new DefaultObjectMapper(),
|
||||
Config.createFactory(Initialization.loadProperties()).build(MetadataUpdaterConfig.class),
|
||||
dummyPhoneBook,
|
||||
null
|
||||
) {
|
||||
final MetadataUpdater dummyMetadataUpdater =
|
||||
new MetadataUpdater(null, null) {
|
||||
@Override
|
||||
public void publishSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
package com.metamx.druid.realtime;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DbSegmentPublisher implements SegmentPublisher
|
||||
{
|
||||
private static final Logger log = new Logger(DbSegmentPublisher.class);
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final MetadataUpdaterConfig config;
|
||||
private final DBI dbi;
|
||||
|
||||
public DbSegmentPublisher(
|
||||
ObjectMapper jsonMapper,
|
||||
MetadataUpdaterConfig config,
|
||||
DBI dbi
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
this.dbi = dbi;
|
||||
}
|
||||
|
||||
public void publishSegment(final DataSegment segment) throws IOException
|
||||
{
|
||||
try {
|
||||
List<Map<String, Object>> exists = dbi.withHandle(
|
||||
new HandleCallback<List<Map<String, Object>>>()
|
||||
{
|
||||
@Override
|
||||
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
|
||||
{
|
||||
return handle.createQuery(
|
||||
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentTable())
|
||||
)
|
||||
.bind("id", segment.getIdentifier())
|
||||
.list();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
if (!exists.isEmpty()) {
|
||||
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
|
||||
return;
|
||||
}
|
||||
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
|
||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
|
||||
config.getSegmentTable()
|
||||
)
|
||||
)
|
||||
.bind("id", segment.getIdentifier())
|
||||
.bind("dataSource", segment.getDataSource())
|
||||
.bind("created_date", new DateTime().toString())
|
||||
.bind("start", segment.getInterval().getStart().toString())
|
||||
.bind("end", segment.getInterval().getEnd().toString())
|
||||
.bind("partitioned", segment.getShardSpec().getPartitionNum())
|
||||
.bind("version", segment.getVersion())
|
||||
.bind("used", true)
|
||||
.bind("payload", jsonMapper.writeValueAsString(segment))
|
||||
.execute();
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception inserting into DB");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,201 +1,35 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.realtime;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MetadataUpdater
|
||||
public class MetadataUpdater implements SegmentAnnouncer, SegmentPublisher
|
||||
{
|
||||
private static final Logger log = new Logger(MetadataUpdater.class);
|
||||
private final SegmentAnnouncer segmentAnnouncer;
|
||||
private final SegmentPublisher segmentPublisher;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final MetadataUpdaterConfig config;
|
||||
private final PhoneBook yp;
|
||||
private final String servedSegmentsLocation;
|
||||
private final DBI dbi;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
public MetadataUpdater(
|
||||
ObjectMapper jsonMapper,
|
||||
MetadataUpdaterConfig config,
|
||||
PhoneBook yp,
|
||||
DBI dbi
|
||||
)
|
||||
public MetadataUpdater(SegmentAnnouncer segmentAnnouncer, SegmentPublisher segmentPublisher)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
this.yp = yp;
|
||||
this.servedSegmentsLocation = yp.combineParts(
|
||||
Arrays.asList(
|
||||
config.getServedSegmentsLocation(), config.getServerName()
|
||||
)
|
||||
);
|
||||
|
||||
this.dbi = dbi;
|
||||
}
|
||||
|
||||
public Map<String, String> getStringProps()
|
||||
{
|
||||
return ImmutableMap.of(
|
||||
"name", config.getServerName(),
|
||||
"host", config.getHost(),
|
||||
"maxSize", String.valueOf(config.getMaxSize()),
|
||||
"type", "realtime"
|
||||
);
|
||||
}
|
||||
|
||||
public boolean hasStarted()
|
||||
{
|
||||
return started;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Starting zkCoordinator for server[%s] with config[%s]", config.getServerName(), config);
|
||||
if (yp.lookup(servedSegmentsLocation, Object.class) == null) {
|
||||
yp.post(
|
||||
config.getServedSegmentsLocation(),
|
||||
config.getServerName(),
|
||||
ImmutableMap.of("created", new DateTime().toString())
|
||||
);
|
||||
}
|
||||
|
||||
yp.announce(
|
||||
config.getAnnounceLocation(),
|
||||
config.getServerName(),
|
||||
getStringProps()
|
||||
);
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping MetadataUpdater with config[%s]", config);
|
||||
yp.unannounce(config.getAnnounceLocation(), config.getServerName());
|
||||
|
||||
started = false;
|
||||
}
|
||||
this.segmentAnnouncer = segmentAnnouncer;
|
||||
this.segmentPublisher = segmentPublisher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
log.info("Announcing realtime segment %s", segment.getIdentifier());
|
||||
yp.announce(servedSegmentsLocation, segment.getIdentifier(), segment);
|
||||
segmentAnnouncer.announceSegment(segment);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
log.info("Unannouncing realtime segment %s", segment.getIdentifier());
|
||||
yp.unannounce(servedSegmentsLocation, segment.getIdentifier());
|
||||
segmentAnnouncer.unannounceSegment(segment);
|
||||
}
|
||||
|
||||
public void publishSegment(final DataSegment segment) throws IOException
|
||||
@Override
|
||||
public void publishSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
try {
|
||||
List<Map<String, Object>> exists = dbi.withHandle(
|
||||
new HandleCallback<List<Map<String, Object>>>()
|
||||
{
|
||||
@Override
|
||||
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
|
||||
{
|
||||
return handle.createQuery(
|
||||
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentTable())
|
||||
)
|
||||
.bind("id", segment.getIdentifier())
|
||||
.list();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
if (!exists.isEmpty()) {
|
||||
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
|
||||
return;
|
||||
}
|
||||
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
|
||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
|
||||
config.getSegmentTable()
|
||||
)
|
||||
)
|
||||
.bind("id", segment.getIdentifier())
|
||||
.bind("dataSource", segment.getDataSource())
|
||||
.bind("created_date", new DateTime().toString())
|
||||
.bind("start", segment.getInterval().getStart().toString())
|
||||
.bind("end", segment.getInterval().getEnd().toString())
|
||||
.bind("partitioned", segment.getShardSpec().getPartitionNum())
|
||||
.bind("version", segment.getVersion())
|
||||
.bind("used", true)
|
||||
.bind("payload", jsonMapper.writeValueAsString(segment))
|
||||
.execute();
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception inserting into DB");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
segmentPublisher.publishSegment(segment);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,12 +22,28 @@ package com.metamx.druid.realtime;
|
|||
import com.metamx.druid.Query;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface Plumber
|
||||
{
|
||||
/**
|
||||
* Perform any initial setup. Should be called before using any other methods, and should be paired
|
||||
* with a corresponding call to {@link #finishJob}.
|
||||
*/
|
||||
public void startJob();
|
||||
|
||||
public Sink getSink(long timestamp);
|
||||
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
|
||||
|
||||
/**
|
||||
* Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the
|
||||
* machine's local disk.
|
||||
*
|
||||
* @param commitRunnable code to run after persisting data
|
||||
*/
|
||||
void persist(Runnable commitRunnable);
|
||||
|
||||
/**
|
||||
* Perform any final processing and clean up after ourselves. Should be called after all data has been
|
||||
* fed into sinks and persisted.
|
||||
*/
|
||||
public void finishJob();
|
||||
}
|
||||
|
|
|
@ -154,6 +154,8 @@ public class RealtimeManager implements QuerySegmentWalker
|
|||
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
|
||||
|
||||
try {
|
||||
plumber.startJob();
|
||||
|
||||
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
|
||||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow;
|
||||
|
|
|
@ -256,13 +256,18 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
|
|||
protected void initializeMetadataUpdater()
|
||||
{
|
||||
if (metadataUpdater == null) {
|
||||
metadataUpdater = new MetadataUpdater(
|
||||
final MetadataUpdaterConfig metadataUpdaterConfig = getConfigFactory().build(MetadataUpdaterConfig.class);
|
||||
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(metadataUpdaterConfig, getPhoneBook());
|
||||
final SegmentPublisher segmentPublisher = new DbSegmentPublisher(
|
||||
getJsonMapper(),
|
||||
getConfigFactory().build(MetadataUpdaterConfig.class),
|
||||
getPhoneBook(),
|
||||
metadataUpdaterConfig,
|
||||
new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI()
|
||||
);
|
||||
getLifecycle().addManagedInstance(metadataUpdater);
|
||||
|
||||
getLifecycle().addManagedInstance(segmentAnnouncer);
|
||||
getLifecycle().addManagedInstance(segmentPublisher);
|
||||
|
||||
metadataUpdater = new MetadataUpdater(segmentAnnouncer, segmentPublisher);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import com.metamx.druid.Query;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerView;
|
||||
import com.metamx.druid.guava.ThreadRenamingCallable;
|
||||
import com.metamx.druid.guava.ThreadRenamingRunnable;
|
||||
import com.metamx.druid.index.QueryableIndex;
|
||||
import com.metamx.druid.index.QueryableIndexSegment;
|
||||
|
@ -58,11 +59,6 @@ import com.metamx.emitter.EmittingLogger;
|
|||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -75,7 +71,7 @@ import java.util.Arrays;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
|
@ -90,9 +86,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
private final File basePersistDirectory;
|
||||
private final IndexGranularity segmentGranularity;
|
||||
|
||||
private volatile Executor persistExecutor = null;
|
||||
private volatile ScheduledExecutorService scheduledExecutor = null;
|
||||
|
||||
private volatile VersioningPolicy versioningPolicy = null;
|
||||
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
|
||||
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
|
||||
private volatile DataSegmentPusher dataSegmentPusher = null;
|
||||
|
@ -110,6 +104,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
this.windowPeriod = windowPeriod;
|
||||
this.basePersistDirectory = basePersistDirectory;
|
||||
this.segmentGranularity = segmentGranularity;
|
||||
this.versioningPolicy = new IntervalStartVersioningPolicy();
|
||||
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
|
||||
|
||||
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
|
||||
|
@ -117,6 +112,12 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
|
||||
}
|
||||
|
||||
@JsonProperty("versioningPolicy")
|
||||
public void setVersioningPolicy(VersioningPolicy versioningPolicy)
|
||||
{
|
||||
this.versioningPolicy = versioningPolicy;
|
||||
}
|
||||
|
||||
@JsonProperty("rejectionPolicy")
|
||||
public void setRejectionPolicyFactory(RejectionPolicyFactory factory)
|
||||
{
|
||||
|
@ -157,209 +158,28 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
|
||||
{
|
||||
verifyState();
|
||||
initializeExecutors();
|
||||
|
||||
computeBaseDir(schema).mkdirs();
|
||||
|
||||
final Map<Long, Sink> sinks = Maps.newConcurrentMap();
|
||||
|
||||
for (File sinkDir : computeBaseDir(schema).listFiles()) {
|
||||
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
|
||||
|
||||
final File[] sinkFiles = sinkDir.listFiles();
|
||||
Arrays.sort(
|
||||
sinkFiles,
|
||||
new Comparator<File>()
|
||||
{
|
||||
@Override
|
||||
public int compare(File o1, File o2)
|
||||
{
|
||||
try {
|
||||
return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()));
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
log.error(e, "Couldn't compare as numbers? [%s][%s]", o1, o2);
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
try {
|
||||
List<FireHydrant> hydrants = Lists.newArrayList();
|
||||
for (File segmentDir : sinkFiles) {
|
||||
log.info("Loading previously persisted segment at [%s]", segmentDir);
|
||||
hydrants.add(
|
||||
new FireHydrant(
|
||||
new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)),
|
||||
Integer.parseInt(segmentDir.getName())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
Sink currSink = new Sink(sinkInterval, schema, hydrants);
|
||||
sinks.put(sinkInterval.getStartMillis(), currSink);
|
||||
|
||||
metadataUpdater.announceSegment(currSink.getSegment());
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource())
|
||||
.addData("interval", sinkInterval)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
serverView.registerSegmentCallback(
|
||||
persistExecutor,
|
||||
new ServerView.BaseSegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
|
||||
{
|
||||
if ("realtime".equals(server.getType())) {
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
log.debug("Checking segment[%s] on server[%s]", segment, server);
|
||||
if (schema.getDataSource().equals(segment.getDataSource())) {
|
||||
final Interval interval = segment.getInterval();
|
||||
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
|
||||
final Long sinkKey = entry.getKey();
|
||||
if (interval.contains(sinkKey)) {
|
||||
final Sink sink = entry.getValue();
|
||||
log.info("Segment matches sink[%s]", sink);
|
||||
|
||||
if (segment.getVersion().compareTo(sink.getSegment().getVersion()) >= 0) {
|
||||
try {
|
||||
metadataUpdater.unannounceSegment(sink.getSegment());
|
||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
||||
sinks.remove(sinkKey);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource())
|
||||
.addData("interval", sink.getInterval())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
|
||||
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
|
||||
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
|
||||
log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy);
|
||||
|
||||
log.info(
|
||||
"Expect to run at [%s]",
|
||||
new DateTime().plus(
|
||||
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis)
|
||||
)
|
||||
);
|
||||
|
||||
ScheduledExecutors
|
||||
.scheduleAtFixedRate(
|
||||
scheduledExecutor,
|
||||
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
|
||||
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
|
||||
new ThreadRenamingRunnable(String.format("%s-overseer", schema.getDataSource()))
|
||||
{
|
||||
@Override
|
||||
public void doRun()
|
||||
{
|
||||
log.info("Starting merge and push.");
|
||||
|
||||
long minTimestamp = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime()).getMillis() - windowMillis;
|
||||
|
||||
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
|
||||
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
|
||||
final Long intervalStart = entry.getKey();
|
||||
if (intervalStart < minTimestamp) {
|
||||
log.info("Adding entry[%s] for merge and push.", entry);
|
||||
sinksToPush.add(entry);
|
||||
}
|
||||
}
|
||||
|
||||
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
|
||||
final Sink sink = entry.getValue();
|
||||
|
||||
final String threadName = String.format(
|
||||
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey())
|
||||
);
|
||||
persistExecutor.execute(
|
||||
new ThreadRenamingRunnable(threadName)
|
||||
{
|
||||
@Override
|
||||
public void doRun()
|
||||
{
|
||||
final Interval interval = sink.getInterval();
|
||||
|
||||
for (FireHydrant hydrant : sink) {
|
||||
if (!hydrant.hasSwapped()) {
|
||||
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
|
||||
final int rowCount = persistHydrant(hydrant, schema, interval);
|
||||
metrics.incrementRowOutputCount(rowCount);
|
||||
}
|
||||
}
|
||||
|
||||
File mergedFile = null;
|
||||
try {
|
||||
List<QueryableIndex> indexes = Lists.newArrayList();
|
||||
for (FireHydrant fireHydrant : sink) {
|
||||
Segment segment = fireHydrant.getSegment();
|
||||
final QueryableIndex queryableIndex = segment.asQueryableIndex();
|
||||
log.info("Adding hydrant[%s]", fireHydrant);
|
||||
indexes.add(queryableIndex);
|
||||
}
|
||||
|
||||
mergedFile = IndexMerger.mergeQueryableIndex(
|
||||
indexes,
|
||||
schema.getAggregators(),
|
||||
new File(computePersistDir(schema, interval), "merged")
|
||||
);
|
||||
|
||||
QueryableIndex index = IndexIO.loadIndex(mergedFile);
|
||||
|
||||
DataSegment segment = dataSegmentPusher.push(
|
||||
mergedFile,
|
||||
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
|
||||
);
|
||||
|
||||
metadataUpdater.publishSegment(segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
|
||||
.addData("interval", interval)
|
||||
.emit();
|
||||
}
|
||||
|
||||
|
||||
if (mergedFile != null) {
|
||||
try {
|
||||
if (mergedFile != null) {
|
||||
log.info("Deleting Index File[%s]", mergedFile);
|
||||
FileUtils.deleteDirectory(mergedFile);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Error deleting directory[%s]", mergedFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return new Plumber()
|
||||
{
|
||||
private volatile boolean stopped = false;
|
||||
private volatile ExecutorService persistExecutor = null;
|
||||
private volatile ScheduledExecutorService scheduledExecutor = null;
|
||||
|
||||
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
|
||||
|
||||
@Override
|
||||
public void startJob()
|
||||
{
|
||||
computeBaseDir(schema).mkdirs();
|
||||
initializeExecutors();
|
||||
bootstrapSinksFromDisk();
|
||||
registerServerViewCallback();
|
||||
startPersistThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sink getSink(long timestamp)
|
||||
{
|
||||
|
@ -372,14 +192,15 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
Sink retVal = sinks.get(truncatedTime);
|
||||
|
||||
if (retVal == null) {
|
||||
retVal = new Sink(
|
||||
new Interval(new DateTime(truncatedTime), segmentGranularity.increment(new DateTime(truncatedTime))),
|
||||
schema
|
||||
final Interval sinkInterval = new Interval(
|
||||
new DateTime(truncatedTime),
|
||||
segmentGranularity.increment(new DateTime(truncatedTime))
|
||||
);
|
||||
|
||||
retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval));
|
||||
|
||||
try {
|
||||
metadataUpdater.announceSegment(retVal.getSegment());
|
||||
|
||||
sinks.put(truncatedTime, retVal);
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -408,7 +229,6 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
}
|
||||
};
|
||||
|
||||
|
||||
return factory.mergeRunners(
|
||||
EXEC,
|
||||
FunctionalIterable
|
||||
|
@ -473,83 +293,293 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
@Override
|
||||
public void finishJob()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
stopped = true;
|
||||
|
||||
for (final Sink sink : sinks.values()) {
|
||||
try {
|
||||
metadataUpdater.unannounceSegment(sink.getSegment());
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert("Failed to unannounce segment on shutdown")
|
||||
.addData("segment", sink.getSegment())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
// scheduledExecutor is shutdown here, but persistExecutor is shutdown when the
|
||||
// ServerView sends it a new segment callback
|
||||
|
||||
if (scheduledExecutor != null) {
|
||||
scheduledExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeExecutors()
|
||||
{
|
||||
if (persistExecutor == null) {
|
||||
persistExecutor = Executors.newFixedThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("plumber_persist_%d")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
if (scheduledExecutor == null) {
|
||||
scheduledExecutor = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("plumber_scheduled_%d")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void bootstrapSinksFromDisk()
|
||||
{
|
||||
for (File sinkDir : computeBaseDir(schema).listFiles()) {
|
||||
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
|
||||
|
||||
final File[] sinkFiles = sinkDir.listFiles();
|
||||
Arrays.sort(
|
||||
sinkFiles,
|
||||
new Comparator<File>()
|
||||
{
|
||||
@Override
|
||||
public int compare(File o1, File o2)
|
||||
{
|
||||
try {
|
||||
return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName()));
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
log.error(e, "Couldn't compare as numbers? [%s][%s]", o1, o2);
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
try {
|
||||
List<FireHydrant> hydrants = Lists.newArrayList();
|
||||
for (File segmentDir : sinkFiles) {
|
||||
log.info("Loading previously persisted segment at [%s]", segmentDir);
|
||||
hydrants.add(
|
||||
new FireHydrant(
|
||||
new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)),
|
||||
Integer.parseInt(segmentDir.getName())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
|
||||
sinks.put(sinkInterval.getStartMillis(), currSink);
|
||||
|
||||
metadataUpdater.announceSegment(currSink.getSegment());
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource())
|
||||
.addData("interval", sinkInterval)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void registerServerViewCallback()
|
||||
{
|
||||
serverView.registerSegmentCallback(
|
||||
persistExecutor,
|
||||
new ServerView.BaseSegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
|
||||
{
|
||||
if (stopped) {
|
||||
log.info("Unregistering ServerViewCallback");
|
||||
persistExecutor.shutdown();
|
||||
return ServerView.CallbackAction.UNREGISTER;
|
||||
}
|
||||
|
||||
if ("realtime".equals(server.getType())) {
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
log.debug("Checking segment[%s] on server[%s]", segment, server);
|
||||
if (schema.getDataSource().equals(segment.getDataSource())) {
|
||||
final Interval interval = segment.getInterval();
|
||||
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
|
||||
final Long sinkKey = entry.getKey();
|
||||
if (interval.contains(sinkKey)) {
|
||||
final Sink sink = entry.getValue();
|
||||
log.info("Segment matches sink[%s]", sink);
|
||||
|
||||
if (segment.getVersion().compareTo(sink.getSegment().getVersion()) >= 0) {
|
||||
try {
|
||||
metadataUpdater.unannounceSegment(sink.getSegment());
|
||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
||||
sinks.remove(sinkKey);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource())
|
||||
.addData("interval", sink.getInterval())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void startPersistThread()
|
||||
{
|
||||
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
|
||||
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
|
||||
|
||||
log.info(
|
||||
"Expect to run at [%s]",
|
||||
new DateTime().plus(
|
||||
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis)
|
||||
)
|
||||
);
|
||||
|
||||
ScheduledExecutors
|
||||
.scheduleAtFixedRate(
|
||||
scheduledExecutor,
|
||||
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis),
|
||||
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
|
||||
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
|
||||
String.format(
|
||||
"%s-overseer-%d",
|
||||
schema.getDataSource(),
|
||||
schema.getShardSpec().getPartitionNum()
|
||||
)
|
||||
)
|
||||
{
|
||||
@Override
|
||||
public ScheduledExecutors.Signal doCall()
|
||||
{
|
||||
if (stopped) {
|
||||
log.info("Stopping merge-n-push overseer thread");
|
||||
return ScheduledExecutors.Signal.STOP;
|
||||
}
|
||||
|
||||
log.info("Starting merge and push.");
|
||||
|
||||
long minTimestamp = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime()).getMillis()
|
||||
- windowMillis;
|
||||
|
||||
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
|
||||
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
|
||||
final Long intervalStart = entry.getKey();
|
||||
if (intervalStart < minTimestamp) {
|
||||
log.info("Adding entry[%s] for merge and push.", entry);
|
||||
sinksToPush.add(entry);
|
||||
}
|
||||
}
|
||||
|
||||
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
|
||||
final Sink sink = entry.getValue();
|
||||
|
||||
final String threadName = String.format(
|
||||
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey())
|
||||
);
|
||||
persistExecutor.execute(
|
||||
new ThreadRenamingRunnable(threadName)
|
||||
{
|
||||
@Override
|
||||
public void doRun()
|
||||
{
|
||||
final Interval interval = sink.getInterval();
|
||||
|
||||
for (FireHydrant hydrant : sink) {
|
||||
if (!hydrant.hasSwapped()) {
|
||||
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
|
||||
final int rowCount = persistHydrant(hydrant, schema, interval);
|
||||
metrics.incrementRowOutputCount(rowCount);
|
||||
}
|
||||
}
|
||||
|
||||
File mergedFile = null;
|
||||
try {
|
||||
List<QueryableIndex> indexes = Lists.newArrayList();
|
||||
for (FireHydrant fireHydrant : sink) {
|
||||
Segment segment = fireHydrant.getSegment();
|
||||
final QueryableIndex queryableIndex = segment.asQueryableIndex();
|
||||
log.info("Adding hydrant[%s]", fireHydrant);
|
||||
indexes.add(queryableIndex);
|
||||
}
|
||||
|
||||
mergedFile = IndexMerger.mergeQueryableIndex(
|
||||
indexes,
|
||||
schema.getAggregators(),
|
||||
new File(computePersistDir(schema, interval), "merged")
|
||||
);
|
||||
|
||||
QueryableIndex index = IndexIO.loadIndex(mergedFile);
|
||||
|
||||
DataSegment segment = dataSegmentPusher.push(
|
||||
mergedFile,
|
||||
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
|
||||
);
|
||||
|
||||
metadataUpdater.publishSegment(segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
|
||||
.addData("interval", interval)
|
||||
.emit();
|
||||
}
|
||||
|
||||
|
||||
if (mergedFile != null) {
|
||||
try {
|
||||
if (mergedFile != null) {
|
||||
log.info("Deleting Index File[%s]", mergedFile);
|
||||
FileUtils.deleteDirectory(mergedFile);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Error deleting directory[%s]", mergedFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
if (stopped) {
|
||||
log.info("Stopping merge-n-push overseer thread");
|
||||
return ScheduledExecutors.Signal.STOP;
|
||||
} else {
|
||||
return ScheduledExecutors.Signal.REPEAT;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private File computeBaseDir(Schema schema)
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class)
|
||||
})
|
||||
public static interface VersioningPolicy
|
||||
{
|
||||
return new File(basePersistDirectory, schema.getDataSource());
|
||||
public String getVersion(Interval interval);
|
||||
}
|
||||
|
||||
private File computePersistDir(Schema schema, Interval interval)
|
||||
public static class IntervalStartVersioningPolicy implements VersioningPolicy
|
||||
{
|
||||
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists the given hydrant and returns the number of rows persisted
|
||||
*
|
||||
* @param indexToPersist
|
||||
* @param schema
|
||||
* @param interval
|
||||
*
|
||||
* @return the number of rows persisted
|
||||
*/
|
||||
private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
|
||||
{
|
||||
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
|
||||
try {
|
||||
int numRows = indexToPersist.getIndex().size();
|
||||
|
||||
File persistedFile = IndexMerger.persist(
|
||||
indexToPersist.getIndex(),
|
||||
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
|
||||
);
|
||||
|
||||
indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile)));
|
||||
|
||||
return numRows;
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
|
||||
.addData("interval", interval)
|
||||
.addData("count", indexToPersist.getCount())
|
||||
.emit();
|
||||
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyState()
|
||||
{
|
||||
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
|
||||
Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action.");
|
||||
Preconditions.checkNotNull(metadataUpdater, "must specify a metadataUpdater to do this action.");
|
||||
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
|
||||
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
|
||||
}
|
||||
|
||||
private void initializeExecutors()
|
||||
{
|
||||
if (persistExecutor == null) {
|
||||
persistExecutor = Executors.newFixedThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("plumber_persist_%d")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
if (scheduledExecutor == null) {
|
||||
scheduledExecutor = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("plumber_scheduled_%d")
|
||||
.build()
|
||||
);
|
||||
@Override
|
||||
public String getVersion(Interval interval)
|
||||
{
|
||||
return interval.getStart().toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -632,4 +662,57 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
};
|
||||
}
|
||||
}
|
||||
|
||||
private File computeBaseDir(Schema schema)
|
||||
{
|
||||
return new File(basePersistDirectory, schema.getDataSource());
|
||||
}
|
||||
|
||||
private File computePersistDir(Schema schema, Interval interval)
|
||||
{
|
||||
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists the given hydrant and returns the number of rows persisted
|
||||
*
|
||||
* @param indexToPersist
|
||||
* @param schema
|
||||
* @param interval
|
||||
*
|
||||
* @return the number of rows persisted
|
||||
*/
|
||||
private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
|
||||
{
|
||||
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
|
||||
try {
|
||||
int numRows = indexToPersist.getIndex().size();
|
||||
|
||||
File persistedFile = IndexMerger.persist(
|
||||
indexToPersist.getIndex(),
|
||||
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
|
||||
);
|
||||
|
||||
indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile)));
|
||||
|
||||
return numRows;
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
|
||||
.addData("interval", interval)
|
||||
.addData("count", indexToPersist.getCount())
|
||||
.emit();
|
||||
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyState()
|
||||
{
|
||||
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
|
||||
Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action.");
|
||||
Preconditions.checkNotNull(metadataUpdater, "must specify a metadataUpdater to do this action.");
|
||||
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
|
||||
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
package com.metamx.druid.realtime;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public interface SegmentAnnouncer
|
||||
{
|
||||
public void announceSegment(DataSegment segment) throws IOException;
|
||||
public void unannounceSegment(DataSegment segment) throws IOException;
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package com.metamx.druid.realtime;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public interface SegmentPublisher
|
||||
{
|
||||
public void publishSegment(DataSegment segment) throws IOException;
|
||||
}
|
|
@ -50,16 +50,19 @@ public class Sink implements Iterable<FireHydrant>
|
|||
|
||||
private final Interval interval;
|
||||
private final Schema schema;
|
||||
private final String version;
|
||||
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
|
||||
|
||||
|
||||
public Sink(
|
||||
Interval interval,
|
||||
Schema schema
|
||||
Schema schema,
|
||||
String version
|
||||
)
|
||||
{
|
||||
this.schema = schema;
|
||||
this.interval = interval;
|
||||
this.version = version;
|
||||
|
||||
makeNewCurrIndex(interval.getStartMillis(), schema);
|
||||
}
|
||||
|
@ -67,11 +70,13 @@ public class Sink implements Iterable<FireHydrant>
|
|||
public Sink(
|
||||
Interval interval,
|
||||
Schema schema,
|
||||
String version,
|
||||
List<FireHydrant> hydrants
|
||||
)
|
||||
{
|
||||
this.schema = schema;
|
||||
this.interval = interval;
|
||||
this.version = version;
|
||||
|
||||
for (int i = 0; i < hydrants.size(); ++i) {
|
||||
final FireHydrant hydrant = hydrants.get(i);
|
||||
|
@ -100,6 +105,13 @@ public class Sink implements Iterable<FireHydrant>
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isEmpty()
|
||||
{
|
||||
synchronized (currIndex) {
|
||||
return hydrants.size() == 1 && currIndex.getIndex().isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If currIndex is A, creates a new index B, sets currIndex to B and returns A.
|
||||
*
|
||||
|
@ -122,7 +134,7 @@ public class Sink implements Iterable<FireHydrant>
|
|||
return new DataSegment(
|
||||
schema.getDataSource(),
|
||||
interval,
|
||||
interval.getStart().toString(),
|
||||
version,
|
||||
ImmutableMap.<String, Object>of(),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.transform(
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
package com.metamx.druid.realtime;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
public class ZkSegmentAnnouncer implements SegmentAnnouncer
|
||||
{
|
||||
private static final Logger log = new Logger(ZkSegmentAnnouncer.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final MetadataUpdaterConfig config;
|
||||
private final PhoneBook yp;
|
||||
private final String servedSegmentsLocation;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
public ZkSegmentAnnouncer(
|
||||
MetadataUpdaterConfig config,
|
||||
PhoneBook yp
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.yp = yp;
|
||||
this.servedSegmentsLocation = yp.combineParts(
|
||||
Arrays.asList(
|
||||
config.getServedSegmentsLocation(), config.getServerName()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public Map<String, String> getStringProps()
|
||||
{
|
||||
return ImmutableMap.of(
|
||||
"name", config.getServerName(),
|
||||
"host", config.getHost(),
|
||||
"maxSize", String.valueOf(config.getMaxSize()),
|
||||
"type", "realtime"
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Starting zkCoordinator for server[%s] with config[%s]", config.getServerName(), config);
|
||||
if (yp.lookup(servedSegmentsLocation, Object.class) == null) {
|
||||
yp.post(
|
||||
config.getServedSegmentsLocation(),
|
||||
config.getServerName(),
|
||||
ImmutableMap.of("created", new DateTime().toString())
|
||||
);
|
||||
}
|
||||
|
||||
yp.announce(
|
||||
config.getAnnounceLocation(),
|
||||
config.getServerName(),
|
||||
getStringProps()
|
||||
);
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping MetadataUpdater with config[%s]", config);
|
||||
yp.unannounce(config.getAnnounceLocation(), config.getServerName());
|
||||
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void announceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
log.info("Announcing realtime segment %s", segment.getIdentifier());
|
||||
yp.announce(servedSegmentsLocation, segment.getIdentifier(), segment);
|
||||
}
|
||||
|
||||
public void unannounceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
log.info("Unannouncing realtime segment %s", segment.getIdentifier());
|
||||
yp.unannounce(servedSegmentsLocation, segment.getIdentifier());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue