From 094936ca039f5a204d525416f22e746095056acc Mon Sep 17 00:00:00 2001
From: Jihoon Son
Date: Wed, 23 Oct 2019 16:52:02 -0700
Subject: [PATCH] Remove commit() method Firehose (#8688)
* Remove commit() method Firehose
* fix javadoc
---
.../apache/druid/data/input/Committer.java | 3 +
.../org/apache/druid/data/input/Firehose.java | 38 ++-----------
.../druid/data/input/FirehoseFactory.java | 4 +-
.../input/impl/FileIteratingFirehose.java | 7 ---
.../AppenderatorDriverRealtimeIndexTask.java | 3 +-
.../common/task/RealtimeIndexTask.java | 3 +-
.../overlord/sampler/SamplerCache.java | 7 ---
.../SeekableStreamSamplerSpec.java | 7 ---
.../druid/indexing/common/TestFirehose.java | 7 ---
...penderatorDriverRealtimeIndexTaskTest.java | 7 ---
.../indexing/overlord/TaskLifecycleTest.java | 26 ---------
.../firehose/CombiningFirehoseFactory.java | 6 --
.../EventReceiverFirehoseFactory.java | 7 ---
.../firehose/FixedCountFirehoseFactory.java | 6 --
.../firehose/IngestSegmentFirehose.java | 7 ---
.../realtime/firehose/InlineFirehose.java | 7 ---
.../realtime/firehose/PredicateFirehose.java | 6 --
.../realtime/firehose/SqlFirehose.java | 7 ---
.../firehose/TimedShutoffFirehoseFactory.java | 6 --
.../segment/realtime/plumber/Committers.java | 55 +++----------------
.../CombiningFirehoseFactoryTest.java | 7 ---
.../realtime/firehose/InlineFirehoseTest.java | 9 ---
.../plumber/RealtimePlumberSchoolTest.java | 32 ++++++++---
23 files changed, 44 insertions(+), 223 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/Committer.java b/core/src/main/java/org/apache/druid/data/input/Committer.java
index 0f8e31bd714..b4410e185e9 100644
--- a/core/src/main/java/org/apache/druid/data/input/Committer.java
+++ b/core/src/main/java/org/apache/druid/data/input/Committer.java
@@ -21,6 +21,8 @@ package org.apache.druid.data.input;
import org.apache.druid.guice.annotations.ExtensionPoint;
+import javax.annotation.Nullable;
+
/**
* Committer includes a Runnable and a Jackson-serialized metadata object containing the offset
*/
@@ -32,5 +34,6 @@ public interface Committer extends Runnable
* which needs to be serialized and deserialized by Jackson.
* Commit metadata can be a complex type, but we recommend keeping it to List/Map/"Primitive JSON" types
*/
+ @Nullable
Object getMetadata();
}
diff --git a/core/src/main/java/org/apache/druid/data/input/Firehose.java b/core/src/main/java/org/apache/druid/data/input/Firehose.java
index a1af12fb6d7..c732d9dc322 100644
--- a/core/src/main/java/org/apache/druid/data/input/Firehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/Firehose.java
@@ -38,10 +38,9 @@ import java.io.IOException;
* any) run out.
*
* Concurrency:
- * The three methods {@link #hasMore()}, {@link #nextRow()} and {@link #commit()} are all called from the same thread.
- * {@link #commit()}, however, returns a callback which will be called on another thread. {@link #close()} might be
- * called concurrently from a thread different from the thread calling {@link #hasMore()}, {@link #nextRow()} and {@link
- * #commit()}.
+ * The two methods {@link #hasMore()} and {@link #nextRow()} are all called from the same thread.
+ * {@link #close()} might be called concurrently from a thread different from the thread calling {@link #hasMore()}
+ * and {@link #nextRow()}.
*
*/
@ExtensionPoint
@@ -86,36 +85,9 @@ public interface Firehose extends Closeable
}
/**
- * Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
- * often equivalent to everything that has been read since the last commit() call (or instantiation of the object),
- * but doesn't necessarily have to be.
- *
- * This method is called when the main processing loop starts to persist its current batch of things to process.
- * The returned runnable will be run when the current batch has been successfully persisted, there is usually
- * some time lag between when this method is called and when the runnable is run. The Runnable is also run on
- * a separate thread so its operation should be thread-safe.
- *
- * The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
- * been committed on the writer side of this interface protocol.
- *
- * A simple implementation of this interface might do nothing when run() is called
- * (in which case the same do-nothing instance can be returned every time), or
- * a more complex implementation might clean up temporary resources that are no longer needed
- * because of InputRows delivered by prior calls to {@link #nextRow()}.
- *
- */
- Runnable commit();
-
- /**
- * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()}, {@link
- * #nextRow()} and {@link #commit()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
+ * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()} and {@link
+ * #nextRow()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
* continue to work after close(), but since the ingestion side is closed rows will eventually run out.
- *
- * The effects of calling run() on the {@link Runnable} object returned from {@link #commit()} (in other words,
- * doing the commit) concurrently or after close() are unspecified: commit may not be performed silently (that is,
- * run() call completes without an Exception, but the commit is not actually done), or a error may result. Note that
- * {@link #commit()} method itself can be called concurrently with close(), but it doesn't make much sense, because
- * run() on the returned Runnable then can't be called.
*/
@Override
void close() throws IOException;
diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
index e4c5d389343..287f3d253c2 100644
--- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
+++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
@@ -42,7 +42,7 @@ public interface FirehoseFactory
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
*
- * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
+ * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
*
@@ -58,7 +58,7 @@ public interface FirehoseFactory
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block).
*
- * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
+ * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null.
*
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
index 2c2963c8c68..9c167b7bbc6 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java
@@ -25,7 +25,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -109,12 +108,6 @@ public class FileIteratingFirehose implements Firehose
return iterator;
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close() throws IOException
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index c48acea51b9..d6850fe30e0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -272,7 +272,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
this.metrics = fireDepartmentForMetrics.getMetrics();
- Supplier committerSupplier = null;
+ final Supplier committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
@@ -351,7 +351,6 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
- committerSupplier = Committers.supplierFromFirehose(firehose);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index 0dc3ee53633..5b8731edf58 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -351,7 +351,7 @@ public class RealtimeIndexTask extends AbstractTask
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
- Supplier committerSupplier = null;
+ final Supplier committerSupplier = Committers.nilSupplier();
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ?
@@ -387,7 +387,6 @@ public class RealtimeIndexTask extends AbstractTask
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser(), firehoseTempDir);
- committerSupplier = Committers.supplierFromFirehose(firehose);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
index 3c3d6640b33..dc7e04d7142 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java
@@ -30,7 +30,6 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -172,12 +171,6 @@ public class SamplerCache
}
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close()
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index c7303bc55cf..119e516c1a3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -39,7 +39,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.utils.Runnables;
import javax.annotation.Nullable;
import java.io.File;
@@ -168,12 +167,6 @@ public abstract class SeekableStreamSamplerSpec
return rv;
}
- @Override
- public Runnable commit()
- {
- return currentFirehose.commit();
- }
-
@Override
public void close() throws IOException
{
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
index 6842f8a26bc..b3f40c74b99 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
@@ -49,7 +49,6 @@ import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -448,12 +447,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory supplierFromRunnable(final Runnable runnable)
- {
- final Committer committer = new Committer()
- {
- @Override
- public Object getMetadata()
- {
- return null;
- }
-
- @Override
- public void run()
- {
- runnable.run();
- }
- };
- return Suppliers.ofInstance(committer);
- }
-
- public static Supplier supplierFromFirehose(final Firehose firehose)
- {
- return new Supplier()
- {
- @Override
- public Committer get()
- {
- final Runnable commitRunnable = firehose.commit();
- return new Committer()
- {
- @Override
- public Object getMetadata()
- {
- return null;
- }
-
- @Override
- public void run()
- {
- commitRunnable.run();
- }
- };
- }
- };
- }
+ private static final Supplier NIL_SUPPLIER = Suppliers.ofInstance(NIL);
public static Committer nil()
{
return NIL;
}
+
+ public static Supplier nilSupplier()
+ {
+ return NIL_SUPPLIER;
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java
index c08111d8795..cb2be4fe40d 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java
@@ -26,7 +26,6 @@ import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@@ -140,12 +139,6 @@ public class CombiningFirehoseFactoryTest
return iterator.next();
}
- @Override
- public Runnable commit()
- {
- return Runnables.getNoopRunnable();
- }
-
@Override
public void close()
{
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java
index 5e3568bf240..f24cc8528ec 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java
@@ -25,7 +25,6 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.utils.Runnables;
import org.junit.Assert;
import org.junit.Test;
@@ -144,14 +143,6 @@ public class InlineFirehoseTest
Assert.assertNotNull(rowPlusRaw.getParseException());
}
- @Test
- public void testCommit()
- {
- InlineFirehose target = create(NOT_EMPTY);
- Runnable result = target.commit();
- Assert.assertSame(Runnables.getNoopRunnable(), result);
- }
-
@Test
public void testCloseOpen() throws IOException
{
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index 31790305808..617f17a700f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -335,15 +336,10 @@ public class RealtimePlumberSchoolTest
final CountDownLatch doneSignal = new CountDownLatch(1);
plumber.persist(
- Committers.supplierFromRunnable(
- new Runnable()
- {
- @Override
- public void run()
- {
- doneSignal.countDown();
- throw new RuntimeException();
- }
+ supplierFromRunnable(
+ () -> {
+ doneSignal.countDown();
+ throw new RuntimeException();
}
).get()
);
@@ -682,4 +678,22 @@ public class RealtimePlumberSchoolTest
};
}
+ private static Supplier supplierFromRunnable(final Runnable runnable)
+ {
+ final Committer committer = new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return null;
+ }
+
+ @Override
+ public void run()
+ {
+ runnable.run();
+ }
+ };
+ return Suppliers.ofInstance(committer);
+ }
}