Remove commit() method Firehose (#8688)

* Remove commit() method Firehose

* fix javadoc
This commit is contained in:
Jihoon Son 2019-10-23 16:52:02 -07:00 committed by Fangjin Yang
parent 2518478b20
commit 094936ca03
23 changed files with 44 additions and 223 deletions

View File

@ -21,6 +21,8 @@ package org.apache.druid.data.input;
import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.guice.annotations.ExtensionPoint;
import javax.annotation.Nullable;
/** /**
* Committer includes a Runnable and a Jackson-serialized metadata object containing the offset * Committer includes a Runnable and a Jackson-serialized metadata object containing the offset
*/ */
@ -32,5 +34,6 @@ public interface Committer extends Runnable
* which needs to be serialized and deserialized by Jackson. * which needs to be serialized and deserialized by Jackson.
* Commit metadata can be a complex type, but we recommend keeping it to List/Map/"Primitive JSON" types * Commit metadata can be a complex type, but we recommend keeping it to List/Map/"Primitive JSON" types
*/ */
@Nullable
Object getMetadata(); Object getMetadata();
} }

View File

@ -38,10 +38,9 @@ import java.io.IOException;
* any) run out. * any) run out.
* *
* Concurrency: * Concurrency:
* The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread. * The two methods {@link #hasMore()} and {@link #nextRow()} are all called from the same thread.
* {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be * {@link #close()} might be called concurrently from a thread different from the thread calling {@link #hasMore()}
* called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link * and {@link #nextRow()}.
* #commit()}.
* </p> * </p>
*/ */
@ExtensionPoint @ExtensionPoint
@ -86,36 +85,9 @@ public interface Firehose extends Closeable
} }
/** /**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()} and {@link
* often equivalent to everything that has been read since the last commit() call (or instantiation of the object), * #nextRow()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* but doesn't necessarily have to be.
*
* This method is called when the main processing loop starts to persist its current batch of things to process.
* The returned runnable will be run when the current batch has been successfully persisted, there is usually
* some time lag between when this method is called and when the runnable is run. The Runnable is also run on
* a separate thread so its operation should be thread-safe.
*
* The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
* been committed on the writer side of this interface protocol.
* <p>
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* because of InputRows delivered by prior calls to {@link #nextRow()}.
* </p>
*/
Runnable commit();
/**
* Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
* #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* continue to work after close(), but since the ingestion side is closed rows will eventually run out. * continue to work after close(), but since the ingestion side is closed rows will eventually run out.
*
* The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
* doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
* run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
* {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
* run() on the returned Runnable then can't be called.
*/ */
@Override @Override
void close() throws IOException; void close() throws IOException;

View File

@ -42,7 +42,7 @@ public interface FirehoseFactory<T extends InputRowParser>
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block). * call hasMore() on the returned Firehose (which might subsequently block).
* <p/> * <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null. * invalid configuration is preferred over returning null.
* *
@ -58,7 +58,7 @@ public interface FirehoseFactory<T extends InputRowParser>
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block). * call hasMore() on the returned Firehose (which might subsequently block).
* <p/> * <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null. * invalid configuration is preferred over returning null.
* <p/> * <p/>

View File

@ -25,7 +25,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw; import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
@ -109,12 +108,6 @@ public class FileIteratingFirehose implements Firehose
return iterator; return iterator;
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -272,7 +272,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
this.metrics = fireDepartmentForMetrics.getMetrics(); this.metrics = fireDepartmentForMetrics.getMetrics();
Supplier<Committer> committerSupplier = null; final Supplier<Committer> committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox); DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
@ -351,7 +351,6 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
synchronized (this) { synchronized (this) {
if (!gracefullyStopped) { if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir); firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
committerSupplier = Committers.supplierFromFirehose(firehose);
} }
} }

View File

@ -351,7 +351,7 @@ public class RealtimeIndexTask extends AbstractTask
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics); this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
Supplier<Committer> committerSupplier = null; final Supplier<Committer> committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ? LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ?
@ -387,7 +387,6 @@ public class RealtimeIndexTask extends AbstractTask
synchronized (this) { synchronized (this) {
if (!gracefullyStopped) { if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir); firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
committerSupplier = Committers.supplierFromFirehose(firehose);
} }
} }

View File

@ -30,7 +30,6 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.inject.Inject; import javax.inject.Inject;
@ -172,12 +171,6 @@ public class SamplerCache
} }
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() public void close()
{ {

View File

@ -39,7 +39,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
@ -168,12 +167,6 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
} }
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() public void close()
{ {

View File

@ -30,7 +30,6 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;
import java.io.File; import java.io.File;
import java.io.InputStream; import java.io.InputStream;
@ -190,12 +189,6 @@ public class TestFirehose implements Firehose
} }
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() public void close()
{ {

View File

@ -131,7 +131,6 @@ import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.utils.Runnables;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Period; import org.joda.time.Period;
@ -224,12 +223,6 @@ public class AppenderatorDriverRealtimeIndexTaskTest
} }
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() public void close()
{ {

View File

@ -290,19 +290,6 @@ public class TaskLifecycleTest
throw new RuntimeException("HA HA HA"); throw new RuntimeException("HA HA HA");
} }
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override @Override
public void close() public void close()
{ {
@ -345,19 +332,6 @@ public class TaskLifecycleTest
return inputRowIterator.next(); return inputRowIterator.next();
} }
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
}
};
}
@Override @Override
public void close() public void close()
{ {

View File

@ -121,12 +121,6 @@ public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
return rv; return rv;
} }
@Override
public Runnable commit()
{
return currentFirehose.commit();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -49,7 +49,6 @@ import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -448,12 +447,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowPar
} }
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public int getCurrentBufferSize() public int getCurrentBufferSize()
{ {

View File

@ -83,12 +83,6 @@ public class FixedCountFirehoseFactory implements FirehoseFactory
return delegateFirehose.nextRow(); return delegateFirehose.nextRow();
} }
@Override
public Runnable commit()
{
return delegateFirehose.commit();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -44,7 +44,6 @@ import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer; import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
@ -202,12 +201,6 @@ public class IngestSegmentFirehose implements Firehose
return transformer.transform(inputRow); return transformer.transform(inputRow);
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -27,7 +27,6 @@ import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
@ -85,12 +84,6 @@ public class InlineFirehose implements Firehose
} }
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -81,12 +81,6 @@ public class PredicateFirehose implements Firehose
return row; return row;
} }
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -27,7 +27,6 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.prefetch.JsonIterator; import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.Transformer; import org.apache.druid.segment.transform.Transformer;
import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
@ -85,12 +84,6 @@ public class SqlFirehose implements Firehose
return resultIterator.next(); return resultIterator.next();
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -125,12 +125,6 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
return firehose.nextRowWithRaw(); return firehose.nextRowWithRaw();
} }
@Override
public Runnable commit()
{
return firehose.commit();
}
/** /**
* This method is synchronized because it might be called concurrently from multiple threads: from {@link * This method is synchronized because it might be called concurrently from multiple threads: from {@link
* #shutdownExec}, and explicitly on this Firehose object. * #shutdownExec}, and explicitly on this Firehose object.

View File

@ -22,12 +22,14 @@ package org.apache.druid.segment.realtime.plumber;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import javax.annotation.Nullable;
public class Committers public class Committers
{ {
private static final Committer NIL = new Committer() private static final Committer NIL = new Committer()
{ {
@Nullable
@Override @Override
public Object getMetadata() public Object getMetadata()
{ {
@ -40,54 +42,15 @@ public class Committers
// Do nothing // Do nothing
} }
}; };
private static final Supplier<Committer> NIL_SUPPLIER = Suppliers.ofInstance(NIL);
public static Supplier<Committer> supplierFromRunnable(final Runnable runnable)
{
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
runnable.run();
}
};
return Suppliers.ofInstance(committer);
}
public static Supplier<Committer> supplierFromFirehose(final Firehose firehose)
{
return new Supplier<Committer>()
{
@Override
public Committer get()
{
final Runnable commitRunnable = firehose.commit();
return new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
commitRunnable.run();
}
};
}
};
}
public static Committer nil() public static Committer nil()
{ {
return NIL; return NIL;
} }
public static Supplier<Committer> nilSupplier()
{
return NIL_SUPPLIER;
}
} }

View File

@ -26,7 +26,6 @@ import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -140,12 +139,6 @@ public class CombiningFirehoseFactoryTest
return iterator.next(); return iterator.next();
} }
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override @Override
public void close() public void close()
{ {

View File

@ -25,7 +25,6 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.utils.Runnables;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -144,14 +143,6 @@ public class InlineFirehoseTest
Assert.assertNotNull(rowPlusRaw.getParseException()); Assert.assertNotNull(rowPlusRaw.getParseException());
} }
@Test
public void testCommit()
{
InlineFirehose target = create(NOT_EMPTY);
Runnable result = target.commit();
Assert.assertSame(Runnables.getNoopRunnable(), result);
}
@Test @Test
public void testCloseOpen() throws IOException public void testCloseOpen() throws IOException
{ {

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.plumber; package org.apache.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -335,15 +336,10 @@ public class RealtimePlumberSchoolTest
final CountDownLatch doneSignal = new CountDownLatch(1); final CountDownLatch doneSignal = new CountDownLatch(1);
plumber.persist( plumber.persist(
Committers.supplierFromRunnable( supplierFromRunnable(
new Runnable() () -> {
{ doneSignal.countDown();
@Override throw new RuntimeException();
public void run()
{
doneSignal.countDown();
throw new RuntimeException();
}
} }
).get() ).get()
); );
@ -682,4 +678,22 @@ public class RealtimePlumberSchoolTest
}; };
} }
private static Supplier<Committer> supplierFromRunnable(final Runnable runnable)
{
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
runnable.run();
}
};
return Suppliers.ofInstance(committer);
}
} }