diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java new file mode 100644 index 00000000000..37c7b842fc8 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -0,0 +1,97 @@ +package io.druid.segment.realtime.plumber; + +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.IndexGranularity; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.Schema; +import io.druid.server.coordination.DataSegmentAnnouncer; +import org.joda.time.Duration; +import org.joda.time.Period; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + */ +public class FlushingPlumber extends RealtimePlumber +{ + private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class); + + private final Duration flushDuration; + + public FlushingPlumber( + Duration flushDuration, + Period windowPeriod, + File basePersistDirectory, + IndexGranularity segmentGranularity, + Schema schema, + FireDepartmentMetrics metrics, + RejectionPolicy rejectionPolicy, + ServiceEmitter emitter, + QueryRunnerFactoryConglomerate conglomerate, + DataSegmentAnnouncer segmentAnnouncer, + ExecutorService queryExecutorService, + VersioningPolicy versioningPolicy + ) + { + super( + windowPeriod, + basePersistDirectory, + segmentGranularity, + schema, + metrics, + rejectionPolicy, + emitter, + conglomerate, + segmentAnnouncer, + queryExecutorService, + versioningPolicy, + null, + null, + null + ); + + this.flushDuration = flushDuration; + } + + @Override + public void startJob() + { + computeBaseDir(getSchema()).mkdirs(); + initializeExecutors(); + bootstrapSinksFromDisk(); + startPersistThread(); + } + + protected void flushAfterDuration(final long truncatedTime, final Sink sink) + { + ScheduledExecutors.scheduleWithFixedDelay( + getScheduledExecutor(), + flushDuration, + new Runnable() + { + @Override + public void run() + { + abandonSegment(truncatedTime, sink); + } + } + + ); + } + + @Override + public void finishJob() + { + log.info("Shutting down..."); + + for (final Map.Entry entry : getSinks().entrySet()) { + flushAfterDuration(entry.getKey(), entry.getValue()); + } + shutdownScheduledExecutor(); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java new file mode 100644 index 00000000000..6e9e74260fa --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -0,0 +1,126 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.realtime.plumber; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.guice.annotations.Processing; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.segment.IndexGranularity; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.Schema; +import io.druid.server.coordination.DataSegmentAnnouncer; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.validation.constraints.NotNull; +import java.io.File; +import java.util.concurrent.ExecutorService; + +/** + * This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run + * a real time node without the rest of the Druid cluster. + */ +public class FlushingPlumberSchool implements PlumberSchool +{ + private static final EmittingLogger log = new EmittingLogger(FlushingPlumberSchool.class); + + private final Duration flushDuration; + private final Period windowPeriod; + private final File basePersistDirectory; + private final IndexGranularity segmentGranularity; + + @JacksonInject + @NotNull + private volatile ServiceEmitter emitter; + + @JacksonInject + @NotNull + private volatile QueryRunnerFactoryConglomerate conglomerate = null; + + @JacksonInject + @NotNull + private volatile DataSegmentAnnouncer segmentAnnouncer = null; + + @JacksonInject + @NotNull + @Processing + private volatile ExecutorService queryExecutorService = null; + + private volatile VersioningPolicy versioningPolicy = null; + private volatile RejectionPolicyFactory rejectionPolicyFactory = null; + + @JsonCreator + public FlushingPlumberSchool( + @JsonProperty("flushDuration") Duration flushDuration, + @JsonProperty("windowPeriod") Period windowPeriod, + @JsonProperty("basePersistDirectory") File basePersistDirectory, + @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity + ) + { + this.flushDuration = flushDuration; + this.windowPeriod = windowPeriod; + this.basePersistDirectory = basePersistDirectory; + this.segmentGranularity = segmentGranularity; + this.versioningPolicy = new IntervalStartVersioningPolicy(); + this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory(); + + Preconditions.checkNotNull(flushDuration, "FlushingPlumberSchool requires a flushDuration."); + Preconditions.checkNotNull(windowPeriod, "FlushingPlumberSchool requires a windowPeriod."); + Preconditions.checkNotNull(basePersistDirectory, "FlushingPlumberSchool requires a basePersistDirectory."); + Preconditions.checkNotNull(segmentGranularity, "FlushingPlumberSchool requires a segmentGranularity."); + } + + @Override + + public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) + { + verifyState(); + + final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod); + log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy); + + return new FlushingPlumber( + flushDuration, + windowPeriod, + basePersistDirectory, + segmentGranularity, + schema, + metrics, + rejectionPolicy, + emitter, + conglomerate, + segmentAnnouncer, + queryExecutorService, + versioningPolicy + ); + } + + private void verifyState() + { + Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action."); + Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action."); + Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action."); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java index f1e186011da..a2495fba178 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java @@ -27,9 +27,10 @@ import io.druid.segment.realtime.Schema; /** */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -@JsonSubTypes( - @JsonSubTypes.Type(name = "realtime", value = RealtimePlumberSchool.class) -) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "realtime", value = RealtimePlumberSchool.class), + @JsonSubTypes.Type(name = "flushing", value = FlushingPlumberSchool.class) +}) public interface PlumberSchool { /** diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java new file mode 100644 index 00000000000..ef9d0b6ba0b --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -0,0 +1,706 @@ +package io.druid.segment.realtime.plumber; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.Pair; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.DruidServer; +import io.druid.client.ServerView; +import io.druid.common.guava.ThreadRenamingCallable; +import io.druid.common.guava.ThreadRenamingRunnable; +import io.druid.query.MetricsEmittingQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryToolChest; +import io.druid.query.SegmentDescriptor; +import io.druid.query.spec.SpecificSegmentQueryRunner; +import io.druid.query.spec.SpecificSegmentSpec; +import io.druid.segment.IndexGranularity; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.FireHydrant; +import io.druid.segment.realtime.Schema; +import io.druid.segment.realtime.SegmentPublisher; +import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.SingleElementPartitionChunk; +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class RealtimePlumber implements Plumber +{ + private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); + + private final Period windowPeriod; + private final File basePersistDirectory; + private final IndexGranularity segmentGranularity; + private final Schema schema; + private final FireDepartmentMetrics metrics; + private final RejectionPolicy rejectionPolicy; + private final ServiceEmitter emitter; + private final QueryRunnerFactoryConglomerate conglomerate; + private final DataSegmentAnnouncer segmentAnnouncer; + private final ExecutorService queryExecutorService; + private final VersioningPolicy versioningPolicy; + private final DataSegmentPusher dataSegmentPusher; + private final SegmentPublisher segmentPublisher; + private final ServerView serverView; + + + private final Object handoffCondition = new Object(); + private final Map sinks = Maps.newConcurrentMap(); + private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( + String.CASE_INSENSITIVE_ORDER + ); + + private volatile boolean shuttingDown = false; + private volatile boolean stopped = false; + private volatile ExecutorService persistExecutor = null; + private volatile ScheduledExecutorService scheduledExecutor = null; + + public RealtimePlumber( + Period windowPeriod, + File basePersistDirectory, + IndexGranularity segmentGranularity, + Schema schema, + FireDepartmentMetrics metrics, + RejectionPolicy rejectionPolicy, + ServiceEmitter emitter, + QueryRunnerFactoryConglomerate conglomerate, + DataSegmentAnnouncer segmentAnnouncer, + ExecutorService queryExecutorService, + VersioningPolicy versioningPolicy, + DataSegmentPusher dataSegmentPusher, + SegmentPublisher segmentPublisher, + ServerView serverView + ) + { + this.windowPeriod = windowPeriod; + this.basePersistDirectory = basePersistDirectory; + this.segmentGranularity = segmentGranularity; + this.schema = schema; + this.metrics = metrics; + this.rejectionPolicy = rejectionPolicy; + this.emitter = emitter; + this.conglomerate = conglomerate; + this.segmentAnnouncer = segmentAnnouncer; + this.queryExecutorService = queryExecutorService; + this.versioningPolicy = versioningPolicy; + this.dataSegmentPusher = dataSegmentPusher; + this.segmentPublisher = segmentPublisher; + this.serverView = serverView; + } + + public Schema getSchema() + { + return schema; + } + + public ScheduledExecutorService getScheduledExecutor() + { + return scheduledExecutor; + } + + public Map getSinks() + { + return sinks; + } + + @Override + public void startJob() + { + computeBaseDir(schema).mkdirs(); + initializeExecutors(); + bootstrapSinksFromDisk(); + registerServerViewCallback(); + startPersistThread(); + } + + @Override + public Sink getSink(long timestamp) + { + if (!rejectionPolicy.accept(timestamp)) { + return null; + } + + final long truncatedTime = segmentGranularity.truncate(timestamp); + + Sink retVal = sinks.get(truncatedTime); + + if (retVal == null) { + final Interval sinkInterval = new Interval( + new DateTime(truncatedTime), + segmentGranularity.increment(new DateTime(truncatedTime)) + ); + + retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval)); + + try { + segmentAnnouncer.announceSegment(retVal.getSegment()); + sinks.put(truncatedTime, retVal); + sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk(retVal)); + } + catch (IOException e) { + log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) + .addData("interval", retVal.getInterval()) + .emit(); + } + } + + return retVal; + } + + @Override + public QueryRunner getQueryRunner(final Query query) + { + final QueryRunnerFactory> factory = conglomerate.findFactory(query); + final QueryToolChest> toolchest = factory.getToolchest(); + + final Function, ServiceMetricEvent.Builder> builderFn = + new Function, ServiceMetricEvent.Builder>() + { + + @Override + public ServiceMetricEvent.Builder apply(@Nullable Query input) + { + return toolchest.makeMetricBuilder(query); + } + }; + + List> querySinks = Lists.newArrayList(); + for (Interval interval : query.getIntervals()) { + querySinks.addAll(sinkTimeline.lookup(interval)); + } + + return toolchest.mergeResults( + factory.mergeRunners( + queryExecutorService, + FunctionalIterable + .create(querySinks) + .transform( + new Function, QueryRunner>() + { + @Override + public QueryRunner apply(TimelineObjectHolder holder) + { + final Sink theSink = holder.getObject().getChunk(0).getObject(); + return new SpecificSegmentQueryRunner( + new MetricsEmittingQueryRunner( + emitter, + builderFn, + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Iterables.transform( + theSink, + new Function>() + { + @Override + public QueryRunner apply(FireHydrant input) + { + return factory.createRunner(input.getSegment()); + } + } + ) + ) + ), + new SpecificSegmentSpec( + new SegmentDescriptor( + holder.getInterval(), + theSink.getSegment().getVersion(), + theSink.getSegment().getShardSpec().getPartitionNum() + ) + ) + ); + } + } + ) + ) + ); + } + + @Override + public void persist(final Runnable commitRunnable) + { + final List> indexesToPersist = Lists.newArrayList(); + for (Sink sink : sinks.values()) { + if (sink.swappable()) { + indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); + } + } + + log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); + + persistExecutor.execute( + new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource())) + { + @Override + public void doRun() + { + for (Pair pair : indexesToPersist) { + metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); + } + commitRunnable.run(); + } + } + ); + } + + // Submits persist-n-merge task for a Sink to the persistExecutor + private void persistAndMerge(final long truncatedTime, final Sink sink) + { + final String threadName = String.format( + "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime) + ); + persistExecutor.execute( + new ThreadRenamingRunnable(threadName) + { + @Override + public void doRun() + { + final Interval interval = sink.getInterval(); + + for (FireHydrant hydrant : sink) { + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval); + metrics.incrementRowOutputCount(rowCount); + } + } + + final File mergedTarget = new File(computePersistDir(schema, interval), "merged"); + if (mergedTarget.exists()) { + log.info("Skipping already-merged sink: %s", sink); + return; + } + + File mergedFile = null; + try { + List indexes = Lists.newArrayList(); + for (FireHydrant fireHydrant : sink) { + Segment segment = fireHydrant.getSegment(); + final QueryableIndex queryableIndex = segment.asQueryableIndex(); + log.info("Adding hydrant[%s]", fireHydrant); + indexes.add(queryableIndex); + } + + mergedFile = IndexMerger.mergeQueryableIndex( + indexes, + schema.getAggregators(), + mergedTarget + ); + + QueryableIndex index = IndexIO.loadIndex(mergedFile); + + DataSegment segment = dataSegmentPusher.push( + mergedFile, + sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + ); + + segmentPublisher.publishSegment(segment); + } + catch (IOException e) { + log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) + .addData("interval", interval) + .emit(); + if (shuttingDown) { + // We're trying to shut down, and this segment failed to push. Let's just get rid of it. + abandonSegment(truncatedTime, sink); + } + } + + if (mergedFile != null) { + try { + log.info("Deleting Index File[%s]", mergedFile); + FileUtils.deleteDirectory(mergedFile); + } + catch (IOException e) { + log.warn(e, "Error deleting directory[%s]", mergedFile); + } + } + } + } + ); + } + + @Override + public void finishJob() + { + log.info("Shutting down..."); + + shuttingDown = true; + + for (final Map.Entry entry : sinks.entrySet()) { + persistAndMerge(entry.getKey(), entry.getValue()); + } + + while (!sinks.isEmpty()) { + try { + log.info( + "Cannot shut down yet! Sinks remaining: %s", + Joiner.on(", ").join( + Iterables.transform( + sinks.values(), + new Function() + { + @Override + public String apply(Sink input) + { + return input.getSegment().getIdentifier(); + } + } + ) + ) + ); + + synchronized (handoffCondition) { + while (!sinks.isEmpty()) { + handoffCondition.wait(); + } + } + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + shutdownScheduledExecutor(); + + stopped = true; + } + + protected void initializeExecutors() + { + if (persistExecutor == null) { + persistExecutor = Executors.newFixedThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("plumber_persist_%d") + .build() + ); + } + if (scheduledExecutor == null) { + scheduledExecutor = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("plumber_scheduled_%d") + .build() + ); + } + } + + protected void shutdownScheduledExecutor() + { + // scheduledExecutor is shutdown here, but persistExecutor is shutdown when the + // ServerView sends it a new segment callback + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + } + + protected void bootstrapSinksFromDisk() + { + File baseDir = computeBaseDir(schema); + if (baseDir == null || !baseDir.exists()) { + return; + } + + File[] files = baseDir.listFiles(); + if (files == null) { + return; + } + + for (File sinkDir : files) { + Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); + + //final File[] sinkFiles = sinkDir.listFiles(); + // To avoid reading and listing of "merged" dir + final File[] sinkFiles = sinkDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + return !(Ints.tryParse(fileName) == null); + } + } + ); + Arrays.sort( + sinkFiles, + new Comparator() + { + @Override + public int compare(File o1, File o2) + { + try { + return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName())); + } + catch (NumberFormatException e) { + log.error(e, "Couldn't compare as numbers? [%s][%s]", o1, o2); + return o1.compareTo(o2); + } + } + } + ); + + try { + List hydrants = Lists.newArrayList(); + for (File segmentDir : sinkFiles) { + log.info("Loading previously persisted segment at [%s]", segmentDir); + + // Although this has been tackled at start of this method. + // Just a doubly-check added to skip "merged" dir. from being added to hydrants + // If 100% sure that this is not needed, this check can be removed. + if (Ints.tryParse(segmentDir.getName()) == null) { + continue; + } + + hydrants.add( + new FireHydrant( + new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)), + Integer.parseInt(segmentDir.getName()) + ) + ); + } + + Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants); + sinks.put(sinkInterval.getStartMillis(), currSink); + sinkTimeline.add( + currSink.getInterval(), + currSink.getVersion(), + new SingleElementPartitionChunk(currSink) + ); + + segmentAnnouncer.announceSegment(currSink.getSegment()); + } + catch (IOException e) { + log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) + .addData("interval", sinkInterval) + .emit(); + } + } + } + + protected void startPersistThread() + { + final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis(); + final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + + log.info( + "Expect to run at [%s]", + new DateTime().plus( + new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis) + ) + ); + + ScheduledExecutors + .scheduleAtFixedRate( + scheduledExecutor, + new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis), + new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)), + new ThreadRenamingCallable( + String.format( + "%s-overseer-%d", + schema.getDataSource(), + schema.getShardSpec().getPartitionNum() + ) + ) + { + @Override + public ScheduledExecutors.Signal doCall() + { + if (stopped) { + log.info("Stopping merge-n-push overseer thread"); + return ScheduledExecutors.Signal.STOP; + } + + log.info("Starting merge and push."); + + long minTimestamp = segmentGranularity.truncate( + rejectionPolicy.getCurrMaxTime().minus(windowMillis) + ).getMillis(); + + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + final Long intervalStart = entry.getKey(); + if (intervalStart < minTimestamp) { + log.info("Adding entry[%s] for merge and push.", entry); + sinksToPush.add(entry); + } + } + + for (final Map.Entry entry : sinksToPush) { + persistAndMerge(entry.getKey(), entry.getValue()); + } + + if (stopped) { + log.info("Stopping merge-n-push overseer thread"); + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; + } + } + } + ); + } + + /** + * Unannounces a given sink and removes all local references to it. + */ + protected void abandonSegment(final long truncatedTime, final Sink sink) + { + try { + segmentAnnouncer.unannounceSegment(sink.getSegment()); + FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); + log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); + sinks.remove(truncatedTime); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk<>(sink) + ); + synchronized (handoffCondition) { + handoffCondition.notifyAll(); + } + } + catch (IOException e) { + log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } + + protected File computeBaseDir(Schema schema) + { + return new File(basePersistDirectory, schema.getDataSource()); + } + + protected File computePersistDir(Schema schema, Interval interval) + { + return new File(computeBaseDir(schema), interval.toString().replace("/", "_")); + } + + /** + * Persists the given hydrant and returns the number of rows persisted + * + * @param indexToPersist + * @param schema + * @param interval + * + * @return the number of rows persisted + */ + protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) + { + if (indexToPersist.hasSwapped()) { + log.info( + "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", + schema.getDataSource(), interval, indexToPersist + ); + return 0; + } + + log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); + try { + int numRows = indexToPersist.getIndex().size(); + + File persistedFile = IndexMerger.persist( + indexToPersist.getIndex(), + new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) + ); + + indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile))); + + return numRows; + } + catch (IOException e) { + log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) + .addData("interval", interval) + .addData("count", indexToPersist.getCount()) + .emit(); + + throw Throwables.propagate(e); + } + } + + private void registerServerViewCallback() + { + serverView.registerSegmentCallback( + persistExecutor, + new ServerView.BaseSegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment) + { + if (stopped) { + log.info("Unregistering ServerViewCallback"); + persistExecutor.shutdown(); + return ServerView.CallbackAction.UNREGISTER; + } + + if ("realtime".equals(server.getType())) { + return ServerView.CallbackAction.CONTINUE; + } + + log.debug("Checking segment[%s] on server[%s]", segment, server); + if (schema.getDataSource().equals(segment.getDataSource())) { + final Interval interval = segment.getInterval(); + for (Map.Entry entry : sinks.entrySet()) { + final Long sinkKey = entry.getKey(); + if (interval.contains(sinkKey)) { + final Sink sink = entry.getValue(); + log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server); + + final String segmentVersion = segment.getVersion(); + final String sinkVersion = sink.getSegment().getVersion(); + if (segmentVersion.compareTo(sinkVersion) >= 0) { + log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); + abandonSegment(sinkKey, sink); + } + } + } + } + + return ServerView.CallbackAction.CONTINUE; + } + } + ); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 35abfcaf866..ecaf10e3e4b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -22,70 +22,23 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.metamx.common.Pair; -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.client.DruidServer; import io.druid.client.ServerView; -import io.druid.common.guava.ThreadRenamingCallable; -import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.guice.annotations.Processing; -import io.druid.query.MetricsEmittingQueryRunner; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; -import io.druid.query.QueryToolChest; -import io.druid.query.SegmentDescriptor; -import io.druid.query.spec.SpecificSegmentQueryRunner; -import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.IndexGranularity; -import io.druid.segment.IndexIO; -import io.druid.segment.IndexMerger; -import io.druid.segment.QueryableIndex; -import io.druid.segment.QueryableIndexSegment; -import io.druid.segment.Segment; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.Schema; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.timeline.DataSegment; -import io.druid.timeline.TimelineObjectHolder; -import io.druid.timeline.VersionedIntervalTimeline; -import io.druid.timeline.partition.SingleElementPartitionChunk; -import org.apache.commons.io.FileUtils; -import org.joda.time.DateTime; -import org.joda.time.Duration; -import org.joda.time.Interval; import org.joda.time.Period; -import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; /** */ @@ -96,9 +49,6 @@ public class RealtimePlumberSchool implements PlumberSchool private final Period windowPeriod; private final File basePersistDirectory; private final IndexGranularity segmentGranularity; - private final Object handoffCondition = new Object(); - - private volatile boolean shuttingDown = false; @JacksonInject @NotNull @@ -205,572 +155,22 @@ public class RealtimePlumberSchool implements PlumberSchool final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod); log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy); - return new Plumber() - { - private volatile boolean stopped = false; - private volatile ExecutorService persistExecutor = null; - private volatile ScheduledExecutorService scheduledExecutor = null; - - private final Map sinks = Maps.newConcurrentMap(); - private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( - String.CASE_INSENSITIVE_ORDER - ); - - @Override - public void startJob() - { - computeBaseDir(schema).mkdirs(); - initializeExecutors(); - bootstrapSinksFromDisk(); - registerServerViewCallback(); - startPersistThread(); - } - - @Override - public Sink getSink(long timestamp) - { - if (!rejectionPolicy.accept(timestamp)) { - return null; - } - - final long truncatedTime = segmentGranularity.truncate(timestamp); - - Sink retVal = sinks.get(truncatedTime); - - if (retVal == null) { - final Interval sinkInterval = new Interval( - new DateTime(truncatedTime), - segmentGranularity.increment(new DateTime(truncatedTime)) - ); - - retVal = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval)); - - try { - segmentAnnouncer.announceSegment(retVal.getSegment()); - sinks.put(truncatedTime, retVal); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk(retVal)); - } - catch (IOException e) { - log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource()) - .addData("interval", retVal.getInterval()) - .emit(); - } - } - - return retVal; - } - - @Override - public QueryRunner getQueryRunner(final Query query) - { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final QueryToolChest> toolchest = factory.getToolchest(); - - final Function, ServiceMetricEvent.Builder> builderFn = - new Function, ServiceMetricEvent.Builder>() - { - - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolchest.makeMetricBuilder(query); - } - }; - - List> querySinks = Lists.newArrayList(); - for (Interval interval : query.getIntervals()) { - querySinks.addAll(sinkTimeline.lookup(interval)); - } - - return toolchest.mergeResults( - factory.mergeRunners( - queryExecutorService, - FunctionalIterable - .create(querySinks) - .transform( - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(TimelineObjectHolder holder) - { - final Sink theSink = holder.getObject().getChunk(0).getObject(); - return new SpecificSegmentQueryRunner( - new MetricsEmittingQueryRunner( - emitter, - builderFn, - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - Iterables.transform( - theSink, - new Function>() - { - @Override - public QueryRunner apply(FireHydrant input) - { - return factory.createRunner(input.getSegment()); - } - } - ) - ) - ), - new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - theSink.getSegment().getVersion(), - theSink.getSegment().getShardSpec().getPartitionNum() - ) - ) - ); - } - } - ) - ) - ); - } - - @Override - public void persist(final Runnable commitRunnable) - { - final List> indexesToPersist = Lists.newArrayList(); - for (Sink sink : sinks.values()) { - if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); - } - } - - log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); - - persistExecutor.execute( - new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource())) - { - @Override - public void doRun() - { - for (Pair pair : indexesToPersist) { - metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); - } - commitRunnable.run(); - } - } - ); - } - - // Submits persist-n-merge task for a Sink to the persistExecutor - private void persistAndMerge(final long truncatedTime, final Sink sink) - { - final String threadName = String.format( - "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime) - ); - persistExecutor.execute( - new ThreadRenamingRunnable(threadName) - { - @Override - public void doRun() - { - final Interval interval = sink.getInterval(); - - for (FireHydrant hydrant : sink) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); - metrics.incrementRowOutputCount(rowCount); - } - } - - final File mergedTarget = new File(computePersistDir(schema, interval), "merged"); - if (mergedTarget.exists()) { - log.info("Skipping already-merged sink: %s", sink); - return; - } - - File mergedFile = null; - try { - List indexes = Lists.newArrayList(); - for (FireHydrant fireHydrant : sink) { - Segment segment = fireHydrant.getSegment(); - final QueryableIndex queryableIndex = segment.asQueryableIndex(); - log.info("Adding hydrant[%s]", fireHydrant); - indexes.add(queryableIndex); - } - - mergedFile = IndexMerger.mergeQueryableIndex( - indexes, - schema.getAggregators(), - mergedTarget - ); - - QueryableIndex index = IndexIO.loadIndex(mergedFile); - - DataSegment segment = dataSegmentPusher.push( - mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) - ); - - segmentPublisher.publishSegment(segment); - } - catch (IOException e) { - log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) - .addData("interval", interval) - .emit(); - if (shuttingDown) { - // We're trying to shut down, and this segment failed to push. Let's just get rid of it. - abandonSegment(truncatedTime, sink); - } - } - - if (mergedFile != null) { - try { - log.info("Deleting Index File[%s]", mergedFile); - FileUtils.deleteDirectory(mergedFile); - } - catch (IOException e) { - log.warn(e, "Error deleting directory[%s]", mergedFile); - } - } - } - } - ); - } - - @Override - public void finishJob() - { - log.info("Shutting down..."); - - shuttingDown = true; - - for (final Map.Entry entry : sinks.entrySet()) { - persistAndMerge(entry.getKey(), entry.getValue()); - } - - while (!sinks.isEmpty()) { - try { - log.info( - "Cannot shut down yet! Sinks remaining: %s", - Joiner.on(", ").join( - Iterables.transform( - sinks.values(), - new Function() - { - @Override - public String apply(Sink input) - { - return input.getSegment().getIdentifier(); - } - } - ) - ) - ); - - synchronized (handoffCondition) { - while (!sinks.isEmpty()) { - handoffCondition.wait(); - } - } - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - - // scheduledExecutor is shutdown here, but persistExecutor is shutdown when the - // ServerView sends it a new segment callback - if (scheduledExecutor != null) { - scheduledExecutor.shutdown(); - } - - stopped = true; - } - - private void initializeExecutors() - { - if (persistExecutor == null) { - persistExecutor = Executors.newFixedThreadPool( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("plumber_persist_%d") - .build() - ); - } - if (scheduledExecutor == null) { - scheduledExecutor = Executors.newScheduledThreadPool( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("plumber_scheduled_%d") - .build() - ); - } - } - - private void bootstrapSinksFromDisk() - { - File baseDir = computeBaseDir(schema); - if (baseDir == null || !baseDir.exists()) { - return; - } - - File[] files = baseDir.listFiles(); - if (files == null) { - return; - } - - for (File sinkDir : files) { - Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); - - //final File[] sinkFiles = sinkDir.listFiles(); - // To avoid reading and listing of "merged" dir - final File[] sinkFiles = sinkDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File dir, String fileName) - { - return !(Ints.tryParse(fileName) == null); - } - } - ); - Arrays.sort( - sinkFiles, - new Comparator() - { - @Override - public int compare(File o1, File o2) - { - try { - return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName())); - } - catch (NumberFormatException e) { - log.error(e, "Couldn't compare as numbers? [%s][%s]", o1, o2); - return o1.compareTo(o2); - } - } - } - ); - - try { - List hydrants = Lists.newArrayList(); - for (File segmentDir : sinkFiles) { - log.info("Loading previously persisted segment at [%s]", segmentDir); - - // Although this has been tackled at start of this method. - // Just a doubly-check added to skip "merged" dir. from being added to hydrants - // If 100% sure that this is not needed, this check can be removed. - if (Ints.tryParse(segmentDir.getName()) == null) { - continue; - } - - hydrants.add( - new FireHydrant( - new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)), - Integer.parseInt(segmentDir.getName()) - ) - ); - } - - Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants); - sinks.put(sinkInterval.getStartMillis(), currSink); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - new SingleElementPartitionChunk(currSink) - ); - - segmentAnnouncer.announceSegment(currSink.getSegment()); - } - catch (IOException e) { - log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) - .addData("interval", sinkInterval) - .emit(); - } - } - } - - private void registerServerViewCallback() - { - serverView.registerSegmentCallback( - persistExecutor, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment) - { - if (stopped) { - log.info("Unregistering ServerViewCallback"); - persistExecutor.shutdown(); - return ServerView.CallbackAction.UNREGISTER; - } - - if ("realtime".equals(server.getType())) { - return ServerView.CallbackAction.CONTINUE; - } - - log.debug("Checking segment[%s] on server[%s]", segment, server); - if (schema.getDataSource().equals(segment.getDataSource())) { - final Interval interval = segment.getInterval(); - for (Map.Entry entry : sinks.entrySet()) { - final Long sinkKey = entry.getKey(); - if (interval.contains(sinkKey)) { - final Sink sink = entry.getValue(); - log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server); - - final String segmentVersion = segment.getVersion(); - final String sinkVersion = sink.getSegment().getVersion(); - if (segmentVersion.compareTo(sinkVersion) >= 0) { - log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); - abandonSegment(sinkKey, sink); - } - } - } - } - - return ServerView.CallbackAction.CONTINUE; - } - } - ); - } - - private void startPersistThread() - { - final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis(); - final long windowMillis = windowPeriod.toStandardDuration().getMillis(); - - log.info( - "Expect to run at [%s]", - new DateTime().plus( - new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis) - ) - ); - - ScheduledExecutors - .scheduleAtFixedRate( - scheduledExecutor, - new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis), - new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)), - new ThreadRenamingCallable( - String.format( - "%s-overseer-%d", - schema.getDataSource(), - schema.getShardSpec().getPartitionNum() - ) - ) - { - @Override - public ScheduledExecutors.Signal doCall() - { - if (stopped) { - log.info("Stopping merge-n-push overseer thread"); - return ScheduledExecutors.Signal.STOP; - } - - log.info("Starting merge and push."); - - long minTimestamp = segmentGranularity.truncate( - rejectionPolicy.getCurrMaxTime().minus(windowMillis) - ).getMillis(); - - List> sinksToPush = Lists.newArrayList(); - for (Map.Entry entry : sinks.entrySet()) { - final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { - log.info("Adding entry[%s] for merge and push.", entry); - sinksToPush.add(entry); - } - } - - for (final Map.Entry entry : sinksToPush) { - persistAndMerge(entry.getKey(), entry.getValue()); - } - - if (stopped) { - log.info("Stopping merge-n-push overseer thread"); - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } - } - } - ); - } - - /** - * Unannounces a given sink and removes all local references to it. - */ - private void abandonSegment(final long truncatedTime, final Sink sink) { - try { - segmentAnnouncer.unannounceSegment(sink.getSegment()); - FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); - sinks.remove(truncatedTime); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - new SingleElementPartitionChunk<>(sink) - ); - synchronized (handoffCondition) { - handoffCondition.notifyAll(); - } - } - catch (IOException e) { - log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .emit(); - } - } - }; - } - - private File computeBaseDir(Schema schema) - { - return new File(basePersistDirectory, schema.getDataSource()); - } - - private File computePersistDir(Schema schema, Interval interval) - { - return new File(computeBaseDir(schema), interval.toString().replace("/", "_")); - } - - /** - * Persists the given hydrant and returns the number of rows persisted - * - * @param indexToPersist - * @param schema - * @param interval - * - * @return the number of rows persisted - */ - private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) - { - if (indexToPersist.hasSwapped()) { - log.info( - "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", - schema.getDataSource(), interval, indexToPersist - ); - return 0; - } - - log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); - try { - int numRows = indexToPersist.getIndex().size(); - - File persistedFile = IndexMerger.persist( - indexToPersist.getIndex(), - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) - ); - - indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile))); - - return numRows; - } - catch (IOException e) { - log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) - .addData("interval", interval) - .addData("count", indexToPersist.getCount()) - .emit(); - - throw Throwables.propagate(e); - } + return new RealtimePlumber( + windowPeriod, + basePersistDirectory, + segmentGranularity, + schema, + metrics, + rejectionPolicy, + emitter, + conglomerate, + segmentAnnouncer, + queryExecutorService, + versioningPolicy, + dataSegmentPusher, + segmentPublisher, + serverView + ); } private void verifyState()