diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorIngestionSpec.java b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorIngestionSpec.java new file mode 100644 index 00000000000..3716aef3219 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorIngestionSpec.java @@ -0,0 +1,39 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.index; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.IngestionSpec; +import io.druid.segment.indexing.RealtimeIOConfig; + +public class RealtimeAppenderatorIngestionSpec extends IngestionSpec +{ + + @JsonCreator + public RealtimeAppenderatorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") RealtimeIOConfig ioConfig, + @JsonProperty("tuningConfig") RealtimeAppenderatorTuningConfig tuningConfig + ) + { + super(dataSchema, ioConfig, tuningConfig); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java new file mode 100644 index 00000000000..a7084f69d43 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -0,0 +1,195 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.indexing.common.index; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.io.Files; +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.TuningConfig; +import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; + +@JsonTypeName("realtime_appenderator") +public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig +{ + private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxRowsPerSegment = 5_000_000; + private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); + private static final int defaultMaxPendingPersists = 0; + private static final ShardSpec defaultShardSpec = NoneShardSpec.instance(); + private static final IndexSpec defaultIndexSpec = new IndexSpec(); + private static final Boolean defaultReportParseExceptions = Boolean.FALSE; + private static final long defaultPublishAndHandoffTimeout = 0; + private static final long defaultAlertTimeout = 0; + + private static File createNewBasePersistDirectory() + { + return Files.createTempDir(); + } + + private final int maxRowsInMemory; + private final int maxRowsPerSegment; + private final Period intermediatePersistPeriod; + private final File basePersistDirectory; + private final int maxPendingPersists; + private final ShardSpec shardSpec; + private final IndexSpec indexSpec; + private final boolean reportParseExceptions; + private final long publishAndHandoffTimeout; + private final long alertTimeout; + @Nullable + private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + + @JsonCreator + public RealtimeAppenderatorTuningConfig( + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") File basePersistDirectory, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, + @JsonProperty("shardSpec") ShardSpec shardSpec, + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout, + @JsonProperty("alertTimeout") Long alertTimeout, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) + { + this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; + this.maxRowsPerSegment = maxRowsPerSegment == null ? defaultMaxRowsPerSegment : maxRowsPerSegment; + this.intermediatePersistPeriod = intermediatePersistPeriod == null + ? defaultIntermediatePersistPeriod + : intermediatePersistPeriod; + this.basePersistDirectory = basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory; + this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; + this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; + this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.reportParseExceptions = reportParseExceptions == null + ? defaultReportParseExceptions + : reportParseExceptions; + this.publishAndHandoffTimeout = publishAndHandoffTimeout == null + ? defaultPublishAndHandoffTimeout + : publishAndHandoffTimeout; + Preconditions.checkArgument(this.publishAndHandoffTimeout >= 0, "publishAndHandoffTimeout must be >= 0"); + + this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; + Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); + this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + } + + @Override + @JsonProperty + public int getMaxRowsInMemory() + { + return maxRowsInMemory; + } + + @JsonProperty + public int getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + + @Override + @JsonProperty + public Period getIntermediatePersistPeriod() + { + return intermediatePersistPeriod; + } + + @Override + @JsonProperty + public File getBasePersistDirectory() + { + return basePersistDirectory; + } + + @Override + @JsonProperty + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + @JsonProperty + public ShardSpec getShardSpec() + { + return shardSpec; + } + + @Override + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + @Override + @JsonProperty + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @JsonProperty + public long getPublishAndHandoffTimeout() + { + return publishAndHandoffTimeout; + } + + @JsonProperty + public long getAlertTimeout() + { + return alertTimeout; + } + + @Override + @JsonProperty + @Nullable + public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() + { + return segmentWriteOutMediumFactory; + } + + public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) + { + return new RealtimeAppenderatorTuningConfig( + maxRowsInMemory, + maxRowsPerSegment, + intermediatePersistPeriod, + dir, + maxPendingPersists, + shardSpec, + indexSpec, + reportParseExceptions, + publishAndHandoffTimeout, + alertTimeout, + segmentWriteOutMediumFactory + ); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java new file mode 100644 index 00000000000..e117d1138a6 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -0,0 +1,559 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.data.input.Committer; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec; +import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.query.DruidMetrics; +import io.druid.query.NoopQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorDriver; +import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; +import io.druid.segment.realtime.plumber.Committers; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class AppenderatorDriverRealtimeIndexTask extends AbstractTask +{ + private static final String CTX_KEY_LOOKUP_TIER = "lookupTier"; + + private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); + private static final Random random = new Random(); + + private static String makeTaskId(RealtimeAppenderatorIngestionSpec spec) + { + final StringBuilder suffix = new StringBuilder(8); + for (int i = 0; i < Ints.BYTES * 2; ++i) { + suffix.append((char) ('a' + ((random.nextInt() >>> (i * 4)) & 0x0F))); + } + return StringUtils.format( + "index_realtime_%s_%d_%s_%s", + spec.getDataSchema().getDataSource(), + spec.getTuningConfig().getShardSpec().getPartitionNum(), + DateTimes.nowUtc(), + suffix + ); + } + + @JsonIgnore + private final RealtimeAppenderatorIngestionSpec spec; + + @JsonIgnore + private final Queue> pendingHandoffs; + + @JsonIgnore + private volatile Appenderator appenderator = null; + + @JsonIgnore + private volatile Firehose firehose = null; + + @JsonIgnore + private volatile FireDepartmentMetrics metrics = null; + + @JsonIgnore + private volatile boolean gracefullyStopped = false; + + @JsonIgnore + private volatile boolean finishingJob = false; + + @JsonIgnore + private volatile Thread runThread = null; + + @JsonCreator + public AppenderatorDriverRealtimeIndexTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("spec") RealtimeAppenderatorIngestionSpec spec, + @JsonProperty("context") Map context + ) + { + super( + id == null ? makeTaskId(spec) : id, + StringUtils.format("index_realtime_appenderator_%s", spec.getDataSchema().getDataSource()), + taskResource, + spec.getDataSchema().getDataSource(), + context + ); + this.spec = spec; + this.pendingHandoffs = new ConcurrentLinkedQueue<>(); + } + + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + } + + @Override + public String getType() + { + return "index_realtime_appenderator"; + } + + @Override + public String getNodeType() + { + return "realtime"; + } + + @Override + public QueryRunner getQueryRunner(Query query) + { + if (appenderator == null) { + // Not yet initialized, no data yet, just return a noop runner. + return new NoopQueryRunner<>(); + } + + return (queryPlus, responseContext) -> queryPlus.run(appenderator, responseContext); + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) + { + return true; + } + + @Override + public TaskStatus run(final TaskToolbox toolbox) throws Exception + { + runThread = Thread.currentThread(); + + setupTimeoutAlert(); + + DataSchema dataSchema = spec.getDataSchema(); + RealtimeAppenderatorTuningConfig tuningConfig = spec.getTuningConfig() + .withBasePersistDirectory(toolbox.getPersistDir()); + + final FireDepartment fireDepartmentForMetrics = new FireDepartment( + dataSchema, new RealtimeIOConfig(null, null, null), null + ); + + final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor( + ImmutableList.of(fireDepartmentForMetrics), + ImmutableMap.of( + DruidMetrics.TASK_ID, new String[]{getId()} + ) + ); + + this.metrics = fireDepartmentForMetrics.getMetrics(); + + Supplier committerSupplier = null; + final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); + + DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox); + + appenderator = newAppenderator(dataSchema, tuningConfig, metrics, toolbox); + AppenderatorDriver driver = newDriver(dataSchema, appenderator, toolbox, metrics); + + try { + toolbox.getDataSegmentServerAnnouncer().announce(); + toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); + + driver.startJob(); + + // Set up metrics emission + toolbox.getMonitorScheduler().addMonitor(metricsMonitor); + + // Firehose temporary directory is automatically removed when this RealtimeIndexTask completes. + FileUtils.forceMkdir(firehoseTempDir); + + // Delay firehose connection to avoid claiming input resources while the plumber is starting up. + final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory(); + final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory); + + int sequenceNumber = 0; + String sequenceName = makeSequenceName(getId(), sequenceNumber); + + final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { + final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments); + return toolbox.getTaskActionClient().submit(action).isSuccess(); + }; + + // Skip connecting firehose if we've been stopped before we got started. + synchronized (this) { + if (!gracefullyStopped) { + firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir); + committerSupplier = Committers.supplierFromFirehose(firehose); + } + } + + // Time to read data! + while (!gracefullyStopped && firehoseDrainableByClosing && firehose.hasMore()) { + try { + InputRow inputRow = firehose.nextRow(); + + if (inputRow == null) { + log.debug("Discarded null row, considering thrownAway."); + metrics.incrementThrownAway(); + } else { + AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); + + if (addResult.isOk()) { + if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { + publishSegments(driver, publisher, committerSupplier, sequenceName); + + sequenceNumber++; + sequenceName = makeSequenceName(getId(), sequenceNumber); + } + } else { + // Failure to allocate segment puts determinism at risk, bail out to be safe. + // May want configurable behavior here at some point. + // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. + throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp()); + } + + metrics.incrementProcessed(); + } + } + catch (ParseException e) { + if (tuningConfig.isReportParseExceptions()) { + throw e; + } else { + log.debug(e, "Discarded row due to exception, considering unparseable."); + metrics.incrementUnparseable(); + } + } + } + + if (!gracefullyStopped) { + synchronized (this) { + if (gracefullyStopped) { + // Someone called stopGracefully after we checked the flag. That's okay, just stop now. + log.info("Gracefully stopping."); + } else { + finishingJob = true; + } + } + + if (finishingJob) { + log.info("Finishing job..."); + // Publish any remaining segments + publishSegments(driver, publisher, committerSupplier, sequenceName); + + waitForSegmentPublishAndHandoff(tuningConfig.getPublishAndHandoffTimeout()); + } + } else if (firehose != null) { + log.info("Task was gracefully stopped, will persist data before exiting"); + + persistAndWait(driver, committerSupplier.get()); + } + } + catch (Throwable e) { + log.makeAlert(e, "Exception aborted realtime processing[%s]", dataSchema.getDataSource()) + .emit(); + throw e; + } + finally { + CloseQuietly.close(firehose); + CloseQuietly.close(appenderator); + CloseQuietly.close(driver); + + toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); + + toolbox.getDataSegmentServerAnnouncer().unannounce(); + toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); + } + + log.info("Job done!"); + return TaskStatus.success(getId()); + } + + @Override + public boolean canRestore() + { + return true; + } + + @Override + public void stopGracefully() + { + try { + synchronized (this) { + if (!gracefullyStopped) { + gracefullyStopped = true; + if (firehose == null) { + log.info("stopGracefully: Firehose not started yet, so nothing to stop."); + } else if (finishingJob) { + log.info("stopGracefully: Interrupting finishJob."); + runThread.interrupt(); + } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + log.info("stopGracefully: Draining firehose."); + firehose.close(); + } else { + log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread."); + runThread.interrupt(); + } + } + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * Public for tests. + */ + @JsonIgnore + public Firehose getFirehose() + { + return firehose; + } + + /** + * Public for tests. + */ + @JsonIgnore + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + + @JsonProperty("spec") + public RealtimeAppenderatorIngestionSpec getSpec() + { + return spec; + } + + /** + * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than + * abruptly stopping. + * + * This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. + * + * Protected for tests. + */ + protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) + { + return firehoseFactory instanceof EventReceiverFirehoseFactory + || (firehoseFactory instanceof TimedShutoffFirehoseFactory + && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory())) + || (firehoseFactory instanceof ClippedFirehoseFactory + && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate())); + } + + private void setupTimeoutAlert() + { + if (spec.getTuningConfig().getAlertTimeout() > 0) { + Timer timer = new Timer("RealtimeIndexTask-Timer", true); + timer.schedule( + new TimerTask() + { + @Override + public void run() + { + log.makeAlert( + "RealtimeIndexTask for dataSource [%s] hasn't finished in configured time [%d] ms.", + spec.getDataSchema().getDataSource(), + spec.getTuningConfig().getAlertTimeout() + ).emit(); + } + }, + spec.getTuningConfig().getAlertTimeout() + ); + } + } + + private void publishSegments( + AppenderatorDriver driver, + TransactionalSegmentPublisher publisher, + Supplier committerSupplier, + String sequenceName + ) + { + ListenableFuture publishFuture = driver.publish( + publisher, + committerSupplier.get(), + Collections.singletonList(sequenceName) + ); + + ListenableFuture handoffFuture = Futures.transform(publishFuture, driver::registerHandoff); + + pendingHandoffs.add(handoffFuture); + } + + private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException, + TimeoutException + { + if (!pendingHandoffs.isEmpty()) { + ListenableFuture allHandoffs = Futures.allAsList(pendingHandoffs); + log.info("Waiting for handoffs"); + + + if (timeout > 0) { + allHandoffs.get(timeout, TimeUnit.MILLISECONDS); + } else { + allHandoffs.get(); + } + } + } + + private void persistAndWait(AppenderatorDriver driver, Committer committer) + { + try { + final CountDownLatch persistLatch = new CountDownLatch(1); + driver.persist( + new Committer() + { + @Override + public Object getMetadata() + { + return committer.getMetadata(); + } + + @Override + public void run() + { + try { + committer.run(); + } + finally { + persistLatch.countDown(); + } + } + } + ); + persistLatch.await(); + } + catch (InterruptedException e) { + log.debug(e, "Interrupted while finishing the job"); + } + catch (Exception e) { + log.makeAlert(e, "Failed to finish realtime task").emit(); + throw e; + } + } + + private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox toolbox) + { + LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ? + toolbox.getLookupNodeService() : + new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER)); + return new DiscoveryDruidNode( + toolbox.getDruidNode(), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), + lookupNodeService.getName(), lookupNodeService + ) + ); + } + + private static Appenderator newAppenderator( + final DataSchema dataSchema, + final RealtimeAppenderatorTuningConfig tuningConfig, + final FireDepartmentMetrics metrics, + final TaskToolbox toolbox + ) + { + return Appenderators.createRealtime( + dataSchema, + tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), + metrics, + toolbox.getSegmentPusher(), + toolbox.getObjectMapper(), + toolbox.getIndexIO(), + toolbox.getIndexMergerV9(), + toolbox.getQueryRunnerFactoryConglomerate(), + toolbox.getSegmentAnnouncer(), + toolbox.getEmitter(), + toolbox.getQueryExecutorService(), + toolbox.getCache(), + toolbox.getCacheConfig() + ); + } + + private static AppenderatorDriver newDriver( + final DataSchema dataSchema, + final Appenderator appenderator, + final TaskToolbox toolbox, + final FireDepartmentMetrics metrics + ) + { + return new AppenderatorDriver( + appenderator, + new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema), + toolbox.getSegmentHandoffNotifierFactory(), + new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), + toolbox.getObjectMapper(), + metrics + ); + } + + private static String makeSequenceName(String taskId, int sequenceNumber) + { + return taskId + "_" + sequenceNumber; + } +} + diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 395904e3c45..a7b010c08ab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -28,7 +28,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -45,6 +44,7 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.DruidMetrics; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 8e18d725e43..e3eb3e77c5e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -55,6 +55,7 @@ import java.util.Map; @JsonSubTypes.Type(name = "hadoop_convert_segment", value = HadoopConverterTask.class), @JsonSubTypes.Type(name = "hadoop_convert_segment_sub", value = HadoopConverterTask.ConverterSubTask.class), @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), + @JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class), @JsonSubTypes.Type(name = "noop", value = NoopTask.class), @JsonSubTypes.Type(name = "version_converter", value = ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated @JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java new file mode 100644 index 00000000000..2f1097c3e30 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -0,0 +1,1280 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.LookupNodeService; +import io.druid.indexer.TaskState; +import io.druid.indexing.common.SegmentLoaderFactory; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec; +import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig; +import io.druid.indexing.overlord.DataSourceMetadata; +import io.druid.indexing.overlord.HeapMemoryTaskStorage; +import io.druid.indexing.overlord.SegmentPublishResult; +import io.druid.indexing.overlord.TaskLockbox; +import io.druid.indexing.overlord.TaskStorage; +import io.druid.indexing.overlord.supervisor.SupervisorManager; +import io.druid.indexing.test.TestDataSegmentAnnouncer; +import io.druid.indexing.test.TestDataSegmentKiller; +import io.druid.indexing.test.TestDataSegmentPusher; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Pair; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.jackson.JacksonUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.NoopEmitter; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.metrics.MonitorScheduler; +import io.druid.math.expr.ExprMacroTable; +import io.druid.metadata.EntryExistsException; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.metadata.TestDerbyConnector; +import io.druid.query.DefaultQueryRunnerFactoryConglomerate; +import io.druid.query.Druids; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryPlus; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.TestHelper; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.segment.transform.ExpressionTransform; +import io.druid.segment.transform.TransformSpec; +import io.druid.server.DruidNode; +import io.druid.server.coordination.DataSegmentServerAnnouncer; +import io.druid.server.coordination.ServerType; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import org.apache.commons.io.FileUtils; +import org.easymock.EasyMock; +import org.hamcrest.CoreMatchers; +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableCauseMatcher; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +public class AppenderatorDriverRealtimeIndexTaskTest +{ + private static final Logger log = new Logger(AppenderatorDriverRealtimeIndexTaskTest.class); + private static final ServiceEmitter emitter = new ServiceEmitter( + "service", + "host", + new NoopEmitter() + ); + + private static final String FAIL_DIM = "__fail__"; + + private static class TestFirehose implements Firehose + { + private final InputRowParser> parser; + private final List> queue = new LinkedList<>(); + private boolean closed = false; + + public TestFirehose(final InputRowParser> parser) + { + this.parser = parser; + } + + public void addRows(List> rows) + { + synchronized (this) { + queue.addAll(rows); + notifyAll(); + } + } + + @Override + public boolean hasMore() + { + try { + synchronized (this) { + while (queue.isEmpty() && !closed) { + wait(); + } + return !queue.isEmpty(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public InputRow nextRow() + { + synchronized (this) { + final InputRow row = parser.parseBatch(queue.remove(0)).get(0); + if (row != null && row.getRaw(FAIL_DIM) != null) { + throw new ParseException(FAIL_DIM); + } + return row; + } + } + + @Override + public Runnable commit() + { + return () -> {}; + } + + @Override + public void close() throws IOException + { + synchronized (this) { + closed = true; + notifyAll(); + } + } + } + + private static class TestFirehoseFactory implements FirehoseFactory + { + public TestFirehoseFactory() + { + } + + @Override + @SuppressWarnings("unchecked") + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException + { + return new TestFirehose(parser); + } + } + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + + private DateTime now; + private ListeningExecutorService taskExec; + private Map> handOffCallbacks; + private Collection publishedSegments; + private CountDownLatch segmentLatch; + private CountDownLatch handoffLatch; + private TaskStorage taskStorage; + private TaskLockbox taskLockbox; + private TaskToolboxFactory taskToolboxFactory; + private File baseDir; + + @Before + public void setUp() throws IOException + { + EmittingLogger.registerEmitter(emitter); + emitter.start(); + taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d")); + now = DateTimes.nowUtc(); + + TestDerbyConnector derbyConnector = derbyConnectorRule.getConnector(); + derbyConnector.createDataSourceTable(); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + derbyConnector.createPendingSegmentsTable(); + + baseDir = tempFolder.newFolder(); + makeToolboxFactory(baseDir); + } + + @After + public void tearDown() + { + taskExec.shutdownNow(); + } + + @Test(timeout = 60_000L) + public void testDefaultResource() throws Exception + { + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null); + Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup()); + } + + + @Test(timeout = 60_000L, expected = ExecutionException.class) + public void testHandoffTimeout() throws Exception + { + expectPublishedSegments(1); + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null, TransformSpec.NONE, true, 100L); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1") + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // handoff would timeout, resulting in exception + statusFuture.get(); + } + + @Test(timeout = 60_000L) + public void testBasics() throws Exception + { + expectPublishedSegments(1); + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + Collection publishedSegments = awaitSegments(); + + // Check metrics. + Assert.assertEquals(2, task.getMetrics().processed()); + Assert.assertEquals(0, task.getMetrics().thrownAway()); + Assert.assertEquals(0, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(2, sumMetric(task, null, "rows")); + Assert.assertEquals(3, sumMetric(task, null, "met1")); + + awaitHandoffs(); + + for (DataSegment publishedSegment : publishedSegments) { + Optional>> optional = handOffCallbacks.entrySet().stream() + .filter(e -> e.getKey().equals(new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ))) + .findFirst(); + + Assert.assertTrue( + publishedSegment + " missing from handoff callbacks: " + handOffCallbacks, + optional.isPresent() + ); + Pair executorRunnablePair = optional.get().getValue(); + + // Simulate handoff. + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testLateData() throws Exception + { + expectPublishedSegments(1); + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + // Data is from 2 days ago, should still be processed + ImmutableMap.of("t", now.minus(new Period("P2D")).getMillis(), "dim2", "bar", "met1", 2.0) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + Collection publishedSegments = awaitSegments(); + + // Check metrics. + Assert.assertEquals(2, task.getMetrics().processed()); + Assert.assertEquals(0, task.getMetrics().thrownAway()); + Assert.assertEquals(0, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(2, sumMetric(task, null, "rows")); + Assert.assertEquals(3, sumMetric(task, null, "met1")); + + awaitHandoffs(); + + for (DataSegment publishedSegment : publishedSegments) { + Optional>> optional = handOffCallbacks.entrySet().stream() + .filter(e -> e.getKey().equals(new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ))) + .findFirst(); + + Assert.assertTrue( + publishedSegment + " missing from handoff callbacks: " + handOffCallbacks, + optional.isPresent() + ); + Pair executorRunnablePair = optional.get().getValue(); + + // Simulate handoff. + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testMaxRowsPerSegment() throws Exception + { + // Expect 2 segments as we will hit maxRowsPerSegment + expectPublishedSegments(2); + + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + // maxRowsPerSegment is 1000 as configured in #makeRealtimeTask + for (int i = 0; i < 2000; i++) { + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo-" + i, "met1", "1") + ) + ); + } + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + Collection publishedSegments = awaitSegments(); + + // Check metrics. + Assert.assertEquals(2000, task.getMetrics().processed()); + Assert.assertEquals(0, task.getMetrics().thrownAway()); + Assert.assertEquals(0, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(2000, sumMetric(task, null, "rows")); + Assert.assertEquals(2000, sumMetric(task, null, "met1")); + + awaitHandoffs(); + + for (DataSegment publishedSegment : publishedSegments) { + Optional>> optional = handOffCallbacks.entrySet().stream() + .filter(e -> e.getKey().equals(new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ))) + .findFirst(); + + Assert.assertTrue( + publishedSegment + " missing from handoff callbacks: " + handOffCallbacks, + optional.isPresent() + ); + Pair executorRunnablePair = optional.get().getValue(); + + // Simulate handoff. + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testTransformSpec() throws Exception + { + expectPublishedSegments(2); + + final TransformSpec transformSpec = new TransformSpec( + new SelectorDimFilter("dim1", "foo", null), + ImmutableList.of( + new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil()) + ) + ); + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null, transformSpec, true, 0); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", 2.0), + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + Collection publishedSegments = awaitSegments(); + + // Check metrics. + Assert.assertEquals(2, task.getMetrics().processed()); + Assert.assertEquals(1, task.getMetrics().thrownAway()); + Assert.assertEquals(0, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(2, sumMetric(task, null, "rows")); + Assert.assertEquals(2, sumMetric(task, new SelectorDimFilter("dim1t", "foofoo", null), "rows")); + Assert.assertEquals(0, sumMetric(task, new SelectorDimFilter("dim1t", "barbar", null), "rows")); + Assert.assertEquals(3, sumMetric(task, null, "met1")); + + awaitHandoffs(); + + for (DataSegment publishedSegment : publishedSegments) { + Optional>> optional = handOffCallbacks.entrySet().stream() + .filter(e -> e.getKey().equals(new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ))) + .findFirst(); + + Assert.assertTrue( + publishedSegment + " missing from handoff callbacks: " + handOffCallbacks, + optional.isPresent() + ); + Pair executorRunnablePair = optional.get().getValue(); + + // Simulate handoff. + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testReportParseExceptionsOnBadMetric() throws Exception + { + expectPublishedSegments(0); + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null, true); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"), + ImmutableMap.of("t", now.minus(new Period("P1D")).getMillis(), "dim1", "foo", "met1", "foo"), + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for the task to finish. + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ParseException.class)); + expectedException.expectCause( + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString("Encountered parse error for aggregator[met1]") + ) + ); + expectedException.expect( + ThrowableCauseMatcher.hasCause( + ThrowableCauseMatcher.hasCause( + CoreMatchers.allOf( + CoreMatchers.instanceOf(ParseException.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString("Unable to parse value[foo] for field[met1]") + ) + ) + ) + ) + ); + statusFuture.get(); + } + + @Test(timeout = 60_000L) + public void testNoReportParseExceptions() throws Exception + { + expectPublishedSegments(1); + + final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null, false); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + Arrays.asList( + // Good row- will be processed. + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "1"), + + // Null row- will be thrown away. + null, + + // Bad metric- will count as processed, but that particular metric won't update. + ImmutableMap.of("t", now.getMillis(), "dim1", "foo", "met1", "foo"), + + // Bad row- will be unparseable. + ImmutableMap.of("dim1", "foo", "met1", 2.0, FAIL_DIM, "x"), + + // Good row- will be processed. + ImmutableMap.of("t", now.getMillis(), "dim2", "bar", "met1", 2.0) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + Collection publishedSegments = awaitSegments(); + + DataSegment publishedSegment = Iterables.getOnlyElement(publishedSegments); + + // Check metrics. + Assert.assertEquals(3, task.getMetrics().processed()); + Assert.assertEquals(0, task.getMetrics().thrownAway()); + Assert.assertEquals(2, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(3, sumMetric(task, null, "rows")); + Assert.assertEquals(3, sumMetric(task, null, "met1")); + + awaitHandoffs(); + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testRestore() throws Exception + { + expectPublishedSegments(0); + + final AppenderatorDriverRealtimeIndexTask task1 = makeRealtimeTask(null); + final DataSegment publishedSegment; + + // First run: + { + final ListenableFuture statusFuture = runTask(task1); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo") + ) + ); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + + // Nothing should be published. + Assert.assertTrue(publishedSegments.isEmpty()); + } + + // Second run: + { + expectPublishedSegments(1); + final AppenderatorDriverRealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final ListenableFuture statusFuture = runTask(task2); + + // Wait for firehose to show up, it starts off null. + while (task2.getFirehose() == null) { + Thread.sleep(50); + } + + // Do a query, at this point the previous data should be loaded. + Assert.assertEquals(1, sumMetric(task2, null, "rows")); + + final TestFirehose firehose = (TestFirehose) task2.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim2", "bar") + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + Collection publishedSegments = awaitSegments(); + + publishedSegment = Iterables.getOnlyElement(publishedSegments); + + // Do a query. + Assert.assertEquals(2, sumMetric(task2, null, "rows")); + + awaitHandoffs(); + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + } + + @Test(timeout = 60_000L) + public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception + { + final AppenderatorDriverRealtimeIndexTask task1 = makeRealtimeTask(null); + final DataSegment publishedSegment; + + // First run: + { + expectPublishedSegments(1); + final ListenableFuture statusFuture = runTask(task1); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo") + ) + ); + + // Stop the firehose, this will trigger a finishJob. + firehose.close(); + + Collection publishedSegments = awaitSegments(); + + publishedSegment = Iterables.getOnlyElement(publishedSegments); + + // Do a query. + Assert.assertEquals(1, sumMetric(task1, null, "rows")); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter. + while (!statusFuture.isDone()) { + Thread.sleep(50); + } + } + + // Second run: + { + expectPublishedSegments(1); + final AppenderatorDriverRealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final ListenableFuture statusFuture = runTask(task2); + + // Wait for firehose to show up, it starts off null. + while (task2.getFirehose() == null) { + Thread.sleep(50); + } + + // Stop the firehose again, this will start another handoff. + final TestFirehose firehose = (TestFirehose) task2.getFirehose(); + + // Stop the firehose, this will trigger a finishJob. + firehose.close(); + + awaitHandoffs(); + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + } + + @Test(timeout = 60_000L) + public void testRestoreCorruptData() throws Exception + { + final AppenderatorDriverRealtimeIndexTask task1 = makeRealtimeTask(null); + + // First run: + { + expectPublishedSegments(0); + + final ListenableFuture statusFuture = runTask(task1); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo") + ) + ); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + + // Nothing should be published. + Assert.assertTrue(publishedSegments.isEmpty()); + } + + Optional optional = FileUtils.listFiles(baseDir, null, true).stream() + .filter(f -> f.getName().equals("00000.smoosh")) + .findFirst(); + + Assert.assertTrue("Could not find smoosh file", optional.isPresent()); + + // Corrupt the data: + final File smooshFile = optional.get(); + + Files.write(smooshFile.toPath(), StringUtils.toUtf8("oops!")); + + // Second run: + { + expectPublishedSegments(0); + + final AppenderatorDriverRealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final ListenableFuture statusFuture = runTask(task2); + + // Wait for the task to finish. + boolean caught = false; + try { + statusFuture.get(); + } + catch (Exception expected) { + caught = true; + } + Assert.assertTrue("expected exception", caught); + } + } + + @Test(timeout = 60_000L) + public void testStopBeforeStarting() throws Exception + { + expectPublishedSegments(0); + + final AppenderatorDriverRealtimeIndexTask task1 = makeRealtimeTask(null); + + task1.stopGracefully(); + final ListenableFuture statusFuture = runTask(task1); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + + private ListenableFuture runTask(final Task task) + { + try { + taskStorage.insert(task, TaskStatus.running(task.getId())); + } + catch (EntryExistsException e) { + // suppress + } + taskLockbox.syncFromStorage(); + final TaskToolbox toolbox = taskToolboxFactory.build(task); + return taskExec.submit( + new Callable() + { + @Override + public TaskStatus call() throws Exception + { + try { + if (task.isReady(toolbox.getTaskActionClient())) { + return task.run(toolbox); + } else { + throw new ISE("Task is not ready"); + } + } + catch (Exception e) { + log.warn(e, "Task failed"); + throw e; + } + } + } + ); + } + + private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(final String taskId) + { + return makeRealtimeTask(taskId, TransformSpec.NONE, true, 0); + } + + private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions) + { + return makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0); + } + + private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( + final String taskId, + final TransformSpec transformSpec, + final boolean reportParseExceptions, + final long handoffTimeout + ) + { + ObjectMapper objectMapper = new DefaultObjectMapper(); + DataSchema dataSchema = new DataSchema( + "test_ds", + TestHelper.makeJsonMapper().convertValue( + new MapInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec("t", "auto", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim1t")), + null, + null + ) + ) + ), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ), + new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + transformSpec, + objectMapper + ); + RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( + new TestFirehoseFactory(), + null, + null + ); + RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig( + 1000, + 1000, + null, + null, + null, + null, + null, + reportParseExceptions, + handoffTimeout, + null, + null + ); + return new AppenderatorDriverRealtimeIndexTask( + taskId, + null, + new RealtimeAppenderatorIngestionSpec(dataSchema, realtimeIOConfig, tuningConfig), + null + ) + { + @Override + protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) + { + return true; + } + }; + } + + private void expectPublishedSegments(int count) + { + segmentLatch = new CountDownLatch(count); + handoffLatch = new CountDownLatch(count); + } + + private Collection awaitSegments() throws InterruptedException + { + Assert.assertTrue( + "Timed out waiting for segments to be published", + segmentLatch.await(1, TimeUnit.MINUTES) + ); + + return publishedSegments; + } + + private void awaitHandoffs() throws InterruptedException + { + Assert.assertTrue( + "Timed out waiting for segments to be handed off", + handoffLatch.await(1, TimeUnit.MINUTES) + ); + } + + private void makeToolboxFactory(final File directory) + { + taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); + taskLockbox = new TaskLockbox(taskStorage); + + publishedSegments = new CopyOnWriteArrayList<>(); + + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(LinearShardSpec.class); + mapper.registerSubtypes(NumberedShardSpec.class); + IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator( + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnectorRule.getConnector() + ) + { + @Override + public Set announceHistoricalSegments(Set segments) throws IOException + { + Set result = super.announceHistoricalSegments(segments); + + Assert.assertFalse( + "Segment latch not initialized, did you forget to call expectPublishSegments?", + segmentLatch == null + ); + + publishedSegments.addAll(result); + segments.forEach(s -> segmentLatch.countDown()); + + return result; + } + + @Override + public SegmentPublishResult announceHistoricalSegments( + Set segments, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata + ) throws IOException + { + SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata); + + Assert.assertFalse( + "Segment latch not initialized, did you forget to call expectPublishSegments?", + segmentLatch == null + ); + + publishedSegments.addAll(result.getSegments()); + result.getSegments().forEach(s -> segmentLatch.countDown()); + + return result; + } + }; + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); + + final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( + taskLockbox, + mdc, + emitter, + EasyMock.createMock(SupervisorManager.class) + ); + final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( + taskStorage, + taskActionToolbox + ); + IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator( + null, + null, + null + ) + { + @Override + public QueryRunner decorate( + QueryRunner delegate, QueryToolChest> toolChest + ) + { + return delegate; + } + }; + final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap., QueryRunnerFactory>of( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(queryRunnerDecorator), + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + // do nothing + } + } + ) + ) + ); + handOffCallbacks = new ConcurrentHashMap<>(); + final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory() + { + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new SegmentHandoffNotifier() + { + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + handoffLatch.countDown(); + return true; + } + + @Override + public void start() + { + //Noop + } + + @Override + public void close() + { + //Noop + } + + Map> getHandOffCallbacks() + { + return handOffCallbacks; + } + }; + } + }; + final TestUtils testUtils = new TestUtils(); + SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() + { + @Override + public List getLocations() + { + return Lists.newArrayList(); + } + }; + + taskToolboxFactory = new TaskToolboxFactory( + taskConfig, + taskActionClientFactory, + emitter, + new TestDataSegmentPusher(), + new TestDataSegmentKiller(), + null, // DataSegmentMover + null, // DataSegmentArchiver + new TestDataSegmentAnnouncer(), + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + handoffNotifierFactory, + () -> conglomerate, + MoreExecutors.sameThreadExecutor(), // queryExecutorService + EasyMock.createMock(MonitorScheduler.class), + new SegmentLoaderFactory( + new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper()) + ), + testUtils.getTestObjectMapper(), + testUtils.getTestIndexIO(), + MapCache.create(1024), + new CacheConfig(), + testUtils.getTestIndexMergerV9(), + EasyMock.createNiceMock(DruidNodeAnnouncer.class), + EasyMock.createNiceMock(DruidNode.class), + new LookupNodeService("tier"), + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + ); + } + + public long sumMetric(final Task task, final DimFilter filter, final String metric) throws Exception + { + // Do a query. + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test_ds") + .filters(filter) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory(metric, metric) + ) + ).granularity(Granularities.ALL) + .intervals("2000/3000") + .build(); + + List> results = + task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList(); + return results.isEmpty() ? 0 : results.get(0).getValue().getLongMetric(metric); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 5ffb5275182..9f050fdcab4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -30,10 +30,6 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.core.NoopEmitter; -import io.druid.java.util.emitter.service.ServiceEmitter; -import io.druid.java.util.metrics.MonitorScheduler; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; @@ -77,6 +73,10 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.jackson.JacksonUtils; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.NoopEmitter; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.metrics.MonitorScheduler; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.EntryExistsException; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index b0484c88223..578da275e08 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -35,7 +35,6 @@ import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; -import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -52,6 +51,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; +import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.hamcrest.CoreMatchers; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 08d72eab355..a3f8aae5f54 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -34,10 +34,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; -import io.druid.java.util.metrics.Monitor; -import io.druid.java.util.metrics.MonitorScheduler; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -82,6 +78,10 @@ import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Comparators; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.metrics.Monitor; +import io.druid.java.util.metrics.MonitorScheduler; import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; import io.druid.query.QueryRunnerFactoryConglomerate; diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 32c2f5ec962..c9cdf6358ae 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -23,13 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.io.Files; -import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.segment.IndexSpec; import io.druid.segment.realtime.appenderator.AppenderatorConfig; import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; import io.druid.segment.realtime.plumber.RejectionPolicyFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; import io.druid.segment.realtime.plumber.VersioningPolicy; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; import org.joda.time.Period; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 91ea83cc449..2fc9cd08c37 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -21,9 +21,6 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.core.NoopEmitter; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.data.input.impl.DimensionsSpec; @@ -33,7 +30,9 @@ import io.druid.data.input.impl.TimestampSpec; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.granularity.Granularities; -import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.NoopEmitter; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; @@ -55,6 +54,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 20375d45754..76c3a1f2b79 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.cache.MapCache; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -39,9 +38,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; -import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import io.druid.segment.writeout.SegmentWriteOutMediumFactory; -import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; @@ -59,6 +56,9 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentTest; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; +import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils;