Rename FiniteAppenderatorDriver to AppenderatorDriver (#4356)

This commit is contained in:
Jihoon Son 2017-06-03 00:48:44 +09:00 committed by Gian Merlino
parent 0efd18247b
commit f876246af7
6 changed files with 29 additions and 29 deletions

View File

@ -65,9 +65,9 @@ import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandler;
@ -282,7 +282,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
try ( try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer() final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) { ) {
toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDataSegmentServerAnnouncer().announce();
@ -871,13 +871,13 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
); );
} }
private FiniteAppenderatorDriver newDriver( private AppenderatorDriver newDriver(
final Appenderator appenderator, final Appenderator appenderator,
final TaskToolbox toolbox, final TaskToolbox toolbox,
final FireDepartmentMetrics metrics final FireDepartmentMetrics metrics
) )
{ {
return new FiniteAppenderatorDriver( return new AppenderatorDriver(
appenderator, appenderator,
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema), new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(), toolbox.getSegmentHandoffNotifierFactory(),

View File

@ -77,7 +77,7 @@ import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig; import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators; import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
@ -400,7 +400,7 @@ public class IndexTask extends AbstractTask
try ( try (
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema); final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema);
final FiniteAppenderatorDriver driver = newDriver( final AppenderatorDriver driver = newDriver(
appenderator, appenderator,
toolbox, toolbox,
segmentAllocator, segmentAllocator,
@ -543,14 +543,14 @@ public class IndexTask extends AbstractTask
); );
} }
private FiniteAppenderatorDriver newDriver( private AppenderatorDriver newDriver(
final Appenderator appenderator, final Appenderator appenderator,
final TaskToolbox toolbox, final TaskToolbox toolbox,
final SegmentAllocator segmentAllocator, final SegmentAllocator segmentAllocator,
final FireDepartmentMetrics metrics final FireDepartmentMetrics metrics
) )
{ {
return new FiniteAppenderatorDriver( return new AppenderatorDriver(
appenderator, appenderator,
segmentAllocator, segmentAllocator,
new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries

View File

@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* A FiniteAppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you * A AppenderatorDriver drives an Appenderator to index a finite stream of data. This class does not help you
* index unbounded streams. All handoff is done at the end of indexing. * index unbounded streams. All handoff is done at the end of indexing.
* *
* This class helps with doing things that Appenderators don't, including deciding which segments to use (with a * This class helps with doing things that Appenderators don't, including deciding which segments to use (with a
@ -74,9 +74,9 @@ import java.util.stream.Collectors;
* Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as * Note that the commit metadata stored by this class via the underlying Appenderator is not the same metadata as
* you pass in. It's wrapped in some extra metadata needed by the driver. * you pass in. It's wrapped in some extra metadata needed by the driver.
*/ */
public class FiniteAppenderatorDriver implements Closeable public class AppenderatorDriver implements Closeable
{ {
private static final Logger log = new Logger(FiniteAppenderatorDriver.class); private static final Logger log = new Logger(AppenderatorDriver.class);
private final Appenderator appenderator; private final Appenderator appenderator;
private final SegmentAllocator segmentAllocator; private final SegmentAllocator segmentAllocator;
@ -111,7 +111,7 @@ public class FiniteAppenderatorDriver implements Closeable
* @param objectMapper object mapper, used for serde of commit metadata * @param objectMapper object mapper, used for serde of commit metadata
* @param metrics Firedepartment metrics * @param metrics Firedepartment metrics
*/ */
public FiniteAppenderatorDriver( public AppenderatorDriver(
Appenderator appenderator, Appenderator appenderator,
SegmentAllocator segmentAllocator, SegmentAllocator segmentAllocator,
SegmentHandoffNotifierFactory handoffNotifierFactory, SegmentHandoffNotifierFactory handoffNotifierFactory,

View File

@ -25,7 +25,7 @@ import io.druid.data.input.InputRow;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
* Result of {@link FiniteAppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the * Result of {@link AppenderatorDriver#add(InputRow, String, Supplier)}. It contains the identifier of the
* segment which the InputRow is added to and the number of rows in that segment. * segment which the InputRow is added to and the number of rows in that segment.
*/ */
public class AppenderatorDriverAddResult public class AppenderatorDriverAddResult

View File

@ -39,9 +39,9 @@ import io.druid.query.QueryRunner;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestCommitterSupplier; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestCommitterSupplier;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverTest.TestSegmentHandoffNotifierFactory; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -65,7 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class FiniteAppenderatorDriverFailTest public class AppenderatorDriverFailTest
{ {
private static final String DATA_SOURCE = "foo"; private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@ -91,7 +91,7 @@ public class FiniteAppenderatorDriverFailTest
SegmentAllocator allocator; SegmentAllocator allocator;
TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
FiniteAppenderatorDriver driver; AppenderatorDriver driver;
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
@ -117,7 +117,7 @@ public class FiniteAppenderatorDriverFailTest
{ {
expectedException.expect(TimeoutException.class); expectedException.expect(TimeoutException.class);
driver = new FiniteAppenderatorDriver( driver = new AppenderatorDriver(
createPersistFailAppenderator(), createPersistFailAppenderator(),
allocator, allocator,
segmentHandoffNotifierFactory, segmentHandoffNotifierFactory,
@ -139,7 +139,7 @@ public class FiniteAppenderatorDriverFailTest
} }
driver.publish( driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(), AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(), committerSupplier.get(),
ImmutableList.of("dummy") ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
@ -151,7 +151,7 @@ public class FiniteAppenderatorDriverFailTest
expectedException.expect(ExecutionException.class); expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class)); expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class));
driver = new FiniteAppenderatorDriver( driver = new AppenderatorDriver(
createPushInterruptAppenderator(), createPushInterruptAppenderator(),
allocator, allocator,
segmentHandoffNotifierFactory, segmentHandoffNotifierFactory,
@ -173,7 +173,7 @@ public class FiniteAppenderatorDriverFailTest
} }
driver.publish( driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(), AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(), committerSupplier.get(),
ImmutableList.of("dummy") ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
@ -184,7 +184,7 @@ public class FiniteAppenderatorDriverFailTest
{ {
expectedException.expect(TimeoutException.class); expectedException.expect(TimeoutException.class);
driver = new FiniteAppenderatorDriver( driver = new AppenderatorDriver(
createPushFailAppenderator(), createPushFailAppenderator(),
allocator, allocator,
segmentHandoffNotifierFactory, segmentHandoffNotifierFactory,
@ -206,7 +206,7 @@ public class FiniteAppenderatorDriverFailTest
} }
driver.publish( driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(), AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(), committerSupplier.get(),
ImmutableList.of("dummy") ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
@ -221,7 +221,7 @@ public class FiniteAppenderatorDriverFailTest
"Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"
); );
driver = new FiniteAppenderatorDriver( driver = new AppenderatorDriver(
createDropFailAppenderator(), createDropFailAppenderator(),
allocator, allocator,
segmentHandoffNotifierFactory, segmentHandoffNotifierFactory,
@ -243,7 +243,7 @@ public class FiniteAppenderatorDriverFailTest
} }
final SegmentsAndMetadata published = driver.publish( final SegmentsAndMetadata published = driver.publish(
FiniteAppenderatorDriverTest.makeOkPublisher(), AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(), committerSupplier.get(),
ImmutableList.of("dummy") ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);

View File

@ -64,7 +64,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
public class FiniteAppenderatorDriverTest public class AppenderatorDriverTest
{ {
private static final String DATA_SOURCE = "foo"; private static final String DATA_SOURCE = "foo";
private static final String VERSION = "abc123"; private static final String VERSION = "abc123";
@ -95,7 +95,7 @@ public class FiniteAppenderatorDriverTest
SegmentAllocator allocator; SegmentAllocator allocator;
AppenderatorTester appenderatorTester; AppenderatorTester appenderatorTester;
TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
FiniteAppenderatorDriver driver; AppenderatorDriver driver;
@Before @Before
public void setUp() public void setUp()
@ -103,7 +103,7 @@ public class FiniteAppenderatorDriverTest
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
driver = new FiniteAppenderatorDriver( driver = new AppenderatorDriver(
appenderatorTester.getAppenderator(), appenderatorTester.getAppenderator(),
allocator, allocator,
segmentHandoffNotifierFactory, segmentHandoffNotifierFactory,