mirror of https://github.com/apache/druid.git
remove YeOldePlumberSchool.java, unused (#8347)
This commit is contained in:
parent
a4d1219184
commit
b95607d31c
|
@ -1,264 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.common.index;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.druid.data.input.Committer;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryRunner;
|
||||
import org.apache.druid.segment.IndexIO;
|
||||
import org.apache.druid.segment.IndexMergerV9;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.SegmentUtils;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
|
||||
import org.apache.druid.segment.incremental.IndexSizeExceededException;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
|
||||
import org.apache.druid.segment.indexing.TuningConfigs;
|
||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
|
||||
import org.apache.druid.segment.realtime.FireHydrant;
|
||||
import org.apache.druid.segment.realtime.plumber.Plumber;
|
||||
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
|
||||
import org.apache.druid.segment.realtime.plumber.Sink;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Trains plumbers that create a single historical segment.
|
||||
*/
|
||||
@JsonTypeName("historical")
|
||||
public class YeOldePlumberSchool implements PlumberSchool
|
||||
{
|
||||
private final Interval interval;
|
||||
private final String version;
|
||||
private final DataSegmentPusher dataSegmentPusher;
|
||||
private final File tmpSegmentDir;
|
||||
private final IndexMergerV9 indexMergerV9;
|
||||
private final IndexIO indexIO;
|
||||
|
||||
private static final Logger log = new Logger(YeOldePlumberSchool.class);
|
||||
|
||||
@JsonCreator
|
||||
public YeOldePlumberSchool(
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("version") String version,
|
||||
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
|
||||
@JacksonInject("tmpSegmentDir") File tmpSegmentDir,
|
||||
@JacksonInject IndexMergerV9 indexMergerV9,
|
||||
@JacksonInject IndexIO indexIO
|
||||
)
|
||||
{
|
||||
this.interval = interval;
|
||||
this.version = version;
|
||||
this.dataSegmentPusher = dataSegmentPusher;
|
||||
this.tmpSegmentDir = tmpSegmentDir;
|
||||
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
|
||||
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plumber findPlumber(
|
||||
final DataSchema schema,
|
||||
final RealtimeTuningConfig config,
|
||||
final FireDepartmentMetrics metrics
|
||||
)
|
||||
{
|
||||
// There can be only one.
|
||||
final Sink theSink = new Sink(
|
||||
interval,
|
||||
schema,
|
||||
config.getShardSpec(),
|
||||
version,
|
||||
config.getMaxRowsInMemory(),
|
||||
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
|
||||
config.isReportParseExceptions(),
|
||||
config.getDedupColumn(),
|
||||
null
|
||||
);
|
||||
|
||||
// Temporary directory to hold spilled segments.
|
||||
final File persistDir = new File(tmpSegmentDir, theSink.getSegment().getId().toString());
|
||||
|
||||
// Set of spilled segments. Will be merged at the end.
|
||||
final Set<File> spilled = new HashSet<>();
|
||||
|
||||
return new Plumber()
|
||||
{
|
||||
@Override
|
||||
public Object startJob()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
|
||||
{
|
||||
Sink sink = getSink(row.getTimestampFromEpoch());
|
||||
if (sink == null) {
|
||||
return Plumber.THROWAWAY;
|
||||
}
|
||||
|
||||
final IncrementalIndexAddResult addResult = sink.add(row, false);
|
||||
|
||||
if (!sink.canAppendRow()) {
|
||||
persist(committerSupplier.get());
|
||||
}
|
||||
|
||||
return addResult;
|
||||
}
|
||||
|
||||
private Sink getSink(long timestamp)
|
||||
{
|
||||
if (theSink.getInterval().contains(timestamp)) {
|
||||
return theSink;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
|
||||
{
|
||||
throw new UnsupportedOperationException("Don't query me, bro.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void persist(Committer committer)
|
||||
{
|
||||
spillIfSwappable();
|
||||
committer.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finishJob()
|
||||
{
|
||||
// The segment we will upload
|
||||
File fileToUpload = null;
|
||||
|
||||
try {
|
||||
// User should have persisted everything by now.
|
||||
Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!");
|
||||
|
||||
if (spilled.size() == 0) {
|
||||
throw new IllegalStateException("Nothing indexed?");
|
||||
} else if (spilled.size() == 1) {
|
||||
fileToUpload = Iterables.getOnlyElement(spilled);
|
||||
} else {
|
||||
List<QueryableIndex> indexes = new ArrayList<>();
|
||||
for (final File oneSpill : spilled) {
|
||||
indexes.add(indexIO.loadIndex(oneSpill));
|
||||
}
|
||||
|
||||
fileToUpload = new File(tmpSegmentDir, "merged");
|
||||
indexMergerV9.mergeQueryableIndex(
|
||||
indexes,
|
||||
schema.getGranularitySpec().isRollup(),
|
||||
schema.getAggregators(),
|
||||
fileToUpload,
|
||||
config.getIndexSpec(),
|
||||
config.getSegmentWriteOutMediumFactory()
|
||||
);
|
||||
}
|
||||
|
||||
// Map merged segment so we can extract dimensions
|
||||
final QueryableIndex mappedSegment = indexIO.loadIndex(fileToUpload);
|
||||
|
||||
final DataSegment segmentToUpload = theSink.getSegment()
|
||||
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
|
||||
.withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload));
|
||||
|
||||
dataSegmentPusher.push(fileToUpload, segmentToUpload, false);
|
||||
|
||||
log.info("Uploaded segment[%s]", segmentToUpload.getId());
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to merge and upload");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
if (fileToUpload != null) {
|
||||
log.info("Deleting Index File[%s]", fileToUpload);
|
||||
FileUtils.deleteDirectory(fileToUpload);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Error deleting directory[%s]", fileToUpload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void spillIfSwappable()
|
||||
{
|
||||
if (theSink.swappable()) {
|
||||
final FireHydrant indexToPersist = theSink.swap();
|
||||
final int rowsToPersist = indexToPersist.getIndex().size();
|
||||
final File dirToPersist = getSpillDir(indexToPersist.getCount());
|
||||
|
||||
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
|
||||
|
||||
try {
|
||||
indexMergerV9.persist(
|
||||
indexToPersist.getIndex(),
|
||||
dirToPersist,
|
||||
config.getIndexSpecForIntermediatePersists(),
|
||||
config.getSegmentWriteOutMediumFactory()
|
||||
);
|
||||
|
||||
indexToPersist.swapSegment(null);
|
||||
|
||||
metrics.incrementRowOutputCount(rowsToPersist);
|
||||
|
||||
spilled.add(dirToPersist);
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to spill index[%d]", indexToPersist.getCount());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private File getSpillDir(final int n)
|
||||
{
|
||||
return new File(persistDir, StringUtils.format("spill%d", n));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue