Remove ServerView from RealtimeIndexTasks and use coordinator http endpoint for handoffs

- fixes #1970
- extracted out segment handoff callbacks in SegmentHandoffNotifier
which is responsible for tracking segment handoffs and doing callbacks
when handoff is complete.
- Coordinator now maintains a view of segments in the cluster, this
will affect the jam heap requirements for the overlord for large
clusters.
realtime index task and nodes now use HTTP end points exposed by the
coordinator to get serverView

review comment

fix realtime node guide injection

review comments

make test not rely on scheduled exec

fix compilation

fix import

review comment

introduce immutableSegmentLoadInfo

fix son reading

remove unnecessary logging
This commit is contained in:
Nishant 2015-11-26 23:52:20 +05:30
parent 345299aefe
commit 9491e8de3b
38 changed files with 2292 additions and 312 deletions

View File

@ -37,6 +37,17 @@ public interface TimelineLookup<VersionType, ObjectType>
*/
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);
/**
* Does a lookup for the objects representing the given time interval. Will also return
* incomplete PartitionHolders.
*
* @param interval interval to find objects for
*
* @return Holders representing the interval that the objects exist for, PartitionHolders
* can be incomplete. Holders returned sorted by the interval.
*/
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);
}

View File

@ -192,6 +192,18 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
}
}
@Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval)
{
try {
lock.readLock().lock();
return lookup(interval, true);
}
finally {
lock.readLock().unlock();
}
}
public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
{
try {

View File

@ -257,6 +257,16 @@ This config is used to find the [Indexing Service](../design/indexing-service.ht
|--------|-----------|-------|
|`druid.selectors.indexing.serviceName`|The druid.service name of the indexing service Overlord node. To start the Overlord with a different name, set it with this property. |druid/overlord|
### Coordinator Discovery
This config is used to find the [Coordinator](../design/coordinator.html) using Curator service discovery. This config is used by the realtime indexing nodes to get information about the segments loaded in the cluster.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.selectors.coordinator.serviceName`|The druid.service name of the coordinator node. To start the Coordinator with a different name, set it with this property. |druid/coordinator|
### Announcing Segments
You can optionally configure how to announce and unannounce Znodes in ZooKeeper (using Curator). For normal operations you do not need to override any of these configs.

View File

@ -63,6 +63,8 @@ druid.cache.readBufferSize=10485760
# Indexing Service Service Discovery
druid.selectors.indexing.serviceName=druid:prod:overlord
# Coordinator Service Discovery
druid.selectors.coordinator.serviceName=druid:prod:coordinator
```
### Overlord Node

View File

@ -45,6 +45,7 @@ import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -63,14 +64,14 @@ public class TaskToolbox
{
private final TaskConfig config;
private final Task task;
private final TaskActionClientFactory taskActionClientFactory;
private final TaskActionClient taskActionClient;
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final FilteredServerView newSegmentServerView;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
@ -86,14 +87,14 @@ public class TaskToolbox
public TaskToolbox(
TaskConfig config,
Task task,
TaskActionClientFactory taskActionClientFactory,
TaskActionClient taskActionClient,
ServiceEmitter emitter,
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
FilteredServerView newSegmentServerView,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
@ -108,14 +109,14 @@ public class TaskToolbox
{
this.config = config;
this.task = task;
this.taskActionClientFactory = taskActionClientFactory;
this.taskActionClient = taskActionClient;
this.emitter = emitter;
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
@ -135,7 +136,7 @@ public class TaskToolbox
public TaskActionClient getTaskActionClient()
{
return taskActionClientFactory.create(task);
return taskActionClient;
}
public ServiceEmitter getEmitter()
@ -168,9 +169,9 @@ public class TaskToolbox
return segmentAnnouncer;
}
public FilteredServerView getNewSegmentServerView()
public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory()
{
return newSegmentServerView;
return handoffNotifierFactory;
}
public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate()

View File

@ -24,10 +24,10 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
@ -38,6 +38,7 @@ import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import java.io.File;
@ -56,7 +57,7 @@ public class TaskToolboxFactory
private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final FilteredServerView newSegmentServerView;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler;
@ -77,7 +78,7 @@ public class TaskToolboxFactory
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
FilteredServerView newSegmentServerView,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
@ -97,7 +98,7 @@ public class TaskToolboxFactory
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
@ -112,18 +113,17 @@ public class TaskToolboxFactory
public TaskToolbox build(Task task)
{
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox(
config,
task,
taskActionClientFactory,
taskActionClientFactory.create(task),
emitter,
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
newSegmentServerView,
handoffNotifierFactory,
queryRunnerFactoryConglomerate,
queryExecutorService,
monitorScheduler,

View File

@ -287,7 +287,7 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getSegmentPusher(),
lockingSegmentAnnouncer,
segmentPublisher,
toolbox.getNewSegmentServerView(),
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(),
toolbox.getIndexIO(),

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.TaskActionClientFactory;
@ -39,6 +38,7 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
@ -66,7 +66,9 @@ public class TaskToolboxTest
private DataSegmentMover mockDataSegmentMover = EasyMock.createMock(DataSegmentMover.class);
private DataSegmentArchiver mockDataSegmentArchiver = EasyMock.createMock(DataSegmentArchiver.class);
private DataSegmentAnnouncer mockSegmentAnnouncer = EasyMock.createMock(DataSegmentAnnouncer.class);
private FilteredServerView mockNewSegmentServerView = EasyMock.createMock(FilteredServerView.class);
private SegmentHandoffNotifierFactory mockHandoffNotifierFactory = EasyMock.createNiceMock(
SegmentHandoffNotifierFactory.class
);
private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate
= EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
@ -86,7 +88,8 @@ public class TaskToolboxTest
public void setUp() throws IOException
{
EasyMock.expect(task.getId()).andReturn("task_id").anyTimes();
EasyMock.replay(task);
EasyMock.expect(task.getDataSource()).andReturn("task_ds").anyTimes();
EasyMock.replay(task, mockHandoffNotifierFactory);
taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null),
@ -97,7 +100,7 @@ public class TaskToolboxTest
mockDataSegmentMover,
mockDataSegmentArchiver,
mockSegmentAnnouncer,
mockNewSegmentServerView,
mockHandoffNotifierFactory,
mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockMonitorScheduler,
@ -122,12 +125,6 @@ public class TaskToolboxTest
Assert.assertEquals(mockSegmentAnnouncer,taskToolbox.build(task).getSegmentAnnouncer());
}
@Test
public void testGetNewSegmentServerView()
{
Assert.assertEquals(mockNewSegmentServerView,taskToolbox.build(task).getNewSegmentServerView());
}
@Test
public void testGetQueryRunnerFactoryConglomerate()
{

View File

@ -223,26 +223,19 @@ public class IndexTaskTest
indexTask.run(
new TaskToolbox(
null, null, new TaskActionClientFactory()
null, null, new TaskActionClient()
{
@Override
public TaskActionClient create(Task task)
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
return new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Arrays.asList(
new TaskLock(
"", "", null, new DateTime().toString()
)
);
}
return null;
}
};
if (taskAction instanceof LockListAction) {
return (RetType) Arrays.asList(
new TaskLock(
"", "", null, new DateTime().toString()
)
);
}
return null;
}
}, null, new DataSegmentPusher()
{

View File

@ -25,11 +25,13 @@ import com.google.api.client.util.Sets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
@ -60,7 +62,6 @@ import io.druid.indexing.test.TestDataSegmentAnnouncer;
import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.indexing.test.TestDataSegmentPusher;
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import io.druid.indexing.test.TestServerView;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Druids;
@ -72,6 +73,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -90,9 +92,10 @@ import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.metrics.EventReceiverFirehoseRegister;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
@ -107,7 +110,9 @@ import java.io.File;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
public class RealtimeIndexTaskTest
{
@ -136,6 +141,8 @@ public class RealtimeIndexTaskTest
private DateTime now;
private ListeningExecutorService taskExec;
private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
@Before
public void setUp()
@ -204,9 +211,10 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(2, countEvents(task));
// Simulate handoff.
for (DataSegment segment : mdc.getPublished()) {
((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment);
for(Pair<Executor, Runnable> executorRunnablePair : handOffCallbacks.values()){
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
@ -294,9 +302,10 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(2, countEvents(task2));
// Simulate handoff.
for (DataSegment segment : mdc.getPublished()) {
((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment);
for(Pair<Executor, Runnable> executorRunnablePair : handOffCallbacks.values()){
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
@ -485,6 +494,42 @@ public class RealtimeIndexTaskTest
)
)
);
handOffCallbacks = Maps.newConcurrentMap();
handoffNotifierFactory = new SegmentHandoffNotifierFactory()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
{
return new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable));
return true;
}
@Override
public void start()
{
//Noop
}
@Override
public void stop()
{
//Noop
}
Map<SegmentDescriptor, Pair<Executor, Runnable>> getHandOffCallbacks()
{
return handOffCallbacks;
}
};
}
};
final TestUtils testUtils = new TestUtils();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
@ -495,7 +540,7 @@ public class RealtimeIndexTaskTest
null, // DataSegmentMover
null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(),
new TestServerView(),
handoffNotifierFactory,
conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),

View File

@ -78,8 +78,10 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.Assert;
@ -198,6 +200,8 @@ public class IngestSegmentFirehoseFactoryTest
ts,
new TaskActionToolbox(tl, mdc, newMockEmitter())
);
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null),
@ -249,7 +253,7 @@ public class IngestSegmentFirehoseFactoryTest
}
},
null, // segment announcer
null, // new segment server view
notifierFactory,
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler
@ -521,5 +525,7 @@ public class IngestSegmentFirehoseFactoryTest
}
};
}
}

View File

@ -62,10 +62,12 @@ import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@ -293,6 +295,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
}
}
};
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null),
new TaskActionClientFactory()
@ -308,8 +312,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
null, // segment killer
null, // segment mover
null, // segment archiver
null, // segment announcer
null, // new segment server view
null, // segment announcer,
notifierFactory,
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler

View File

@ -26,16 +26,17 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event;
@ -43,8 +44,6 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -61,6 +60,7 @@ import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
@ -69,7 +69,6 @@ import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.RealtimeIndexTaskTest;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
@ -78,6 +77,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -98,8 +98,9 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentTest;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@ -118,7 +119,6 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@ -197,7 +197,6 @@ public class TaskLifecycleTest
private TaskToolboxFactory tb = null;
private IndexSpec indexSpec;
private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private FilteredServerView serverView;
private MonitorScheduler monitorScheduler;
private ServiceEmitter emitter;
private TaskQueueConfig tqc;
@ -205,7 +204,9 @@ public class TaskLifecycleTest
private int announcedSinks;
private static CountDownLatch publishCountDown;
private TestDerbyConnector testDerbyConnector;
private List<ServerView.SegmentCallback> segmentCallbacks = new ArrayList<>();
private SegmentHandoffNotifierFactory handoffNotifierFactory;
private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
private static TestIndexerMetadataStorageCoordinator newMockMDC()
{
@ -393,15 +394,42 @@ public class TaskLifecycleTest
} else {
throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType));
}
serverView = new FilteredServerView()
handOffCallbacks = Maps.newConcurrentMap();
handoffNotifierFactory = new SegmentHandoffNotifierFactory()
{
@Override
public void registerSegmentCallback(
Executor exec, ServerView.SegmentCallback callback, Predicate<DataSegment> filter
)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
{
segmentCallbacks.add(callback);
return new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable));
return true;
}
@Override
public void start()
{
//Noop
}
@Override
public void stop()
{
//Noop
}
Map<SegmentDescriptor, Pair<Executor, Runnable>> getHandOffCallbacks()
{
return handOffCallbacks;
}
};
}
};
setUpAndStartTaskQueue(
@ -485,7 +513,7 @@ public class TaskLifecycleTest
}
}, // segment announcer
serverView, // new segment server view
handoffNotifierFactory,
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
null, // query executor service
monitorScheduler, // monitor scheduler
@ -855,16 +883,10 @@ public class TaskLifecycleTest
Assert.assertTrue(publishCountDown.await(1000, TimeUnit.MILLISECONDS));
// Realtime Task has published the segment, simulate loading of segment to a historical node so that task finishes with SUCCESS status
segmentCallbacks.get(0).segmentAdded(
new DruidServerMetadata(
"dummy",
"dummy_host",
0,
"historical",
"dummy_tier",
0
), mdc.getPublished().iterator().next()
);
Assert.assertEquals(1, handOffCallbacks.size());
Pair<Executor, Runnable> executorRunnablePair = Iterables.getOnlyElement(handOffCallbacks.values());
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
handOffCallbacks.clear();
// Wait for realtime index task to handle callback in plumber and succeed
while (tsqa.getStatus(taskId).get().isRunnable()) {

View File

@ -32,7 +32,10 @@ import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig;
@ -41,6 +44,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
@ -48,6 +52,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
@ -137,6 +142,11 @@ public class WorkerTaskMonitorTest
private WorkerTaskMonitor createTaskMonitor()
{
final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject())).andReturn(taskActionClient).anyTimes();
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory);
return new WorkerTaskMonitor(
jsonMapper,
cf,
@ -144,7 +154,8 @@ public class WorkerTaskMonitorTest
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
taskConfig,
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
taskActionClientFactory,
null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()

View File

@ -0,0 +1,209 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.query.DataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* ServerView of coordinator for the state of segments being loaded in the cluster.
*/
public class CoordinatorServerView implements InventoryView
{
private static final Logger log = new Logger(CoordinatorServerView.class);
private final Object lock = new Object();
private final Map<String, SegmentLoadInfo> segmentLoadInfos;
private final Map<String, VersionedIntervalTimeline<String, SegmentLoadInfo>> timelines;
private final ServerInventoryView baseView;
private volatile boolean initialized = false;
@Inject
public CoordinatorServerView(
ServerInventoryView baseView
)
{
this.baseView = baseView;
this.segmentLoadInfos = Maps.newHashMap();
this.timelines = Maps.newHashMap();
ExecutorService exec = Execs.singleThreaded("CoordinatorServerView-%s");
baseView.registerSegmentCallback(
exec,
new ServerView.SegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
serverAddedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment)
{
serverRemovedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
initialized = true;
return ServerView.CallbackAction.CONTINUE;
}
}
);
baseView.registerServerCallback(
exec,
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
removeServer(server);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
public boolean isInitialized()
{
return initialized;
}
public void clear()
{
synchronized (lock) {
timelines.clear();
segmentLoadInfos.clear();
}
}
private void removeServer(DruidServer server)
{
for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server.getMetadata(), segment);
}
}
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{
String segmentId = segment.getIdentifier();
synchronized (lock) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId);
if (segmentLoadInfo == null) {
// servers escape the scope of this object so use ConcurrentSet
segmentLoadInfo = new SegmentLoadInfo(segment);
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline);
}
timeline.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segmentLoadInfo)
);
segmentLoadInfos.put(segmentId, segmentLoadInfo);
}
segmentLoadInfo.addServer(server);
}
}
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{
String segmentId = segment.getIdentifier();
synchronized (lock) {
log.debug("Removing segment[%s] from server[%s].", segmentId, server);
final SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId);
if (segmentLoadInfo == null) {
log.warn("Told to remove non-existant segment[%s]", segmentId);
return;
}
segmentLoadInfo.removeServer(server);
if (segmentLoadInfo.isEmpty()) {
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = timelines.get(segment.getDataSource());
segmentLoadInfos.remove(segmentId);
final PartitionChunk<SegmentLoadInfo> removedPartition = timeline.remove(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(
new SegmentLoadInfo(
segment
)
)
);
if (removedPartition == null) {
log.warn(
"Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist",
segment.getInterval(),
segment.getVersion()
);
}
}
}
}
public VersionedIntervalTimeline<String, SegmentLoadInfo> getTimeline(DataSource dataSource)
{
String table = Iterables.getOnlyElement(dataSource.getNames());
synchronized (lock) {
return timelines.get(table);
}
}
@Override
public DruidServer getInventoryValue(String string)
{
return baseView.getInventoryValue(string);
}
@Override
public Iterable<DruidServer> getInventory()
{
return baseView.getInventory();
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.Set;
public class ImmutableSegmentLoadInfo
{
private final DataSegment segment;
private final ImmutableSet<DruidServerMetadata> servers;
@JsonCreator
public ImmutableSegmentLoadInfo(
@JsonProperty("segment") DataSegment segment,
@JsonProperty("servers") Set<DruidServerMetadata> servers
)
{
Preconditions.checkNotNull(segment, "segment");
Preconditions.checkNotNull(servers, "servers");
this.segment = segment;
this.servers = ImmutableSet.copyOf(servers);
}
@JsonProperty("segment")
public DataSegment getSegment()
{
return segment;
}
@JsonProperty("servers")
public ImmutableSet<DruidServerMetadata> getServers()
{
return servers;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ImmutableSegmentLoadInfo that = (ImmutableSegmentLoadInfo) o;
if (!segment.equals(that.segment)) {
return false;
}
return servers.equals(that.servers);
}
@Override
public int hashCode()
{
int result = segment.hashCode();
result = 31 * result + servers.hashCode();
return result;
}
@Override
public String toString()
{
return "SegmentLoadInfo{" +
"segment=" + segment +
", servers=" + servers +
'}';
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Set;
public class SegmentLoadInfo
{
private final DataSegment segment;
private final Set<DruidServerMetadata> servers;
public SegmentLoadInfo(DataSegment segment)
{
Preconditions.checkNotNull(segment, "segment");
this.segment = segment;
this.servers = Sets.newConcurrentHashSet();
}
public DataSegment getSegment()
{
return segment;
}
public boolean addServer(DruidServerMetadata server)
{
return servers.add(server);
}
public boolean removeServer(DruidServerMetadata server)
{
return servers.remove(server);
}
public boolean isEmpty()
{
return servers.isEmpty();
}
public ImmutableSegmentLoadInfo toImmutableSegmentLoadInfo()
{
return new ImmutableSegmentLoadInfo(segment, servers);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentLoadInfo that = (SegmentLoadInfo) o;
if (!segment.equals(that.segment)) {
return false;
}
return servers.equals(that.servers);
}
@Override
public int hashCode()
{
int result = segment.hashCode();
result = 31 * result + servers.hashCode();
return result;
}
@Override
public String toString()
{
return "SegmentLoadInfo{" +
"segment=" + segment +
", servers=" + servers +
'}';
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.coordinator;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Coordinator
{
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;
import java.net.URI;
import java.net.URL;
import java.util.List;
public class CoordinatorClient
{
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
private final HttpClient client;
private final ObjectMapper jsonMapper;
private final ServerDiscoverySelector selector;
@Inject
public CoordinatorClient(
@Global HttpClient client,
ObjectMapper jsonMapper,
@Coordinator ServerDiscoverySelector selector
)
{
this.client = client;
this.jsonMapper = jsonMapper;
this.selector = selector;
}
public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
{
try {
StatusResponseHolder response = client.go(
new Request(
HttpMethod.GET,
new URL(
String.format(
"%s/datasources/%s/intervals/%s/serverview?partial=%s",
baseUrl(),
dataSource,
interval.toString().replace("/", "_"),
incompleteOk
)
)
),
RESPONSE_HANDLER
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching serverView status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private String baseUrl()
{
try {
final Server instance = selector.pick();
if (instance == null) {
throw new ISE("Cannot find instance of coordinator");
}
return new URI(
instance.getScheme(),
null,
instance.getAddress(),
instance.getPort(),
"/druid/coordinator/v1",
null,
null
).toString();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class CoordinatorSelectorConfig
{
public static final String DEFAULT_SERVICE_NAME = "druid/coordinator";
@JsonProperty
private String serviceName = DEFAULT_SERVICE_NAME;
public String getServiceName()
{
return serviceName;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import io.druid.client.coordinator.Coordinator;
import io.druid.client.coordinator.CoordinatorSelectorConfig;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
/**
*/
public class CoordinatorDiscoveryModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class);
}
@Provides
@Coordinator
@ManageLifecycle
public ServerDiscoverySelector getServiceProvider(
CoordinatorSelectorConfig config,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
return serverDiscoveryFactory.createSelector(config.getServiceName());
}
}

View File

@ -35,6 +35,7 @@ import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule;
import io.druid.guice.CoordinatorDiscoveryModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.ExtensionsConfig;
@ -298,6 +299,7 @@ public class Initialization
new DerbyMetadataStorageDruidModule(),
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new CoordinatorDiscoveryModule(),
new LocalDataStorageDruidModule(),
new FirehoseModule(),
new ParsersModule()

View File

@ -0,0 +1,172 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.SegmentLoadInfo;
import io.druid.client.coordinator.CoordinatorClient;
import io.druid.concurrent.Execs;
import io.druid.query.SegmentDescriptor;
import io.druid.server.coordination.DruidServerMetadata;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNotifier
{
private static final Logger log = new Logger(CoordinatorBasedSegmentHandoffNotifier.class);
private final ConcurrentMap<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks = Maps.newConcurrentMap();
private final CoordinatorClient coordinatorClient;
private volatile ScheduledExecutorService scheduledExecutor;
private final long pollDurationMillis;
private final String dataSource;
public CoordinatorBasedSegmentHandoffNotifier(
String dataSource,
CoordinatorClient coordinatorClient,
CoordinatorBasedSegmentHandoffNotifierConfig config
)
{
this.dataSource = dataSource;
this.coordinatorClient = coordinatorClient;
this.pollDurationMillis = config.getPollDuration().getMillis();
}
@Override
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
)
{
log.info("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", dataSource, descriptor);
Pair<Executor, Runnable> prev = handOffCallbacks.putIfAbsent(
descriptor,
new Pair<>(exec, handOffRunnable)
);
return prev == null;
}
@Override
public void start()
{
scheduledExecutor = Execs.scheduledSingleThreaded("coordinator_handoff_scheduled_%d");
scheduledExecutor.scheduleAtFixedRate(
new Runnable()
{
@Override
public void run()
{
checkForSegmentHandoffs();
}
}, 0L, pollDurationMillis, TimeUnit.MILLISECONDS
);
}
void checkForSegmentHandoffs()
{
try {
Iterator<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> itr = handOffCallbacks.entrySet()
.iterator();
while (itr.hasNext()) {
Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
SegmentDescriptor descriptor = entry.getKey();
try {
List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
dataSource,
descriptor.getInterval(),
true
);
if (isHandOffComplete(loadedSegments, entry.getKey())) {
log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
}
}
catch (Exception e) {
log.error(
e,
"Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs",
dataSource,
descriptor,
pollDurationMillis
);
}
}
if (!handOffCallbacks.isEmpty()) {
log.info("Still waiting for Handoff for Segments : [%s]", handOffCallbacks.keySet());
}
}
catch (Throwable t) {
log.error(
t,
"Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs",
dataSource,
pollDurationMillis
);
}
}
static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
{
for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
== descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& Iterables.any(
segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>()
{
@Override
public boolean apply(DruidServerMetadata input)
{
return input.isAssignable();
}
}
)) {
return true;
}
}
return false;
}
@Override
public void stop()
{
scheduledExecutor.shutdown();
}
// Used in tests
Map<SegmentDescriptor, Pair<Executor, Runnable>> getHandOffCallbacks()
{
return handOffCallbacks;
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
public class CoordinatorBasedSegmentHandoffNotifierConfig
{
@JsonProperty
public Duration pollDuration = new Period("PT1M").toStandardDuration();
public Duration getPollDuration()
{
return pollDuration;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.google.inject.Inject;
import io.druid.client.coordinator.CoordinatorClient;
public class CoordinatorBasedSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory
{
private final CoordinatorClient client;
private final CoordinatorBasedSegmentHandoffNotifierConfig config;
@Inject
public CoordinatorBasedSegmentHandoffNotifierFactory(
CoordinatorClient client,
CoordinatorBasedSegmentHandoffNotifierConfig config
)
{
this.client = client;
this.config = config;
}
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
{
return new CoordinatorBasedSegmentHandoffNotifier(
dataSource,
client,
config
);
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@ -42,8 +41,6 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
@ -78,7 +75,6 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -119,7 +115,7 @@ public class RealtimePlumber implements Plumber
private final ExecutorService queryExecutorService;
private final DataSegmentPusher dataSegmentPusher;
private final SegmentPublisher segmentPublisher;
private final FilteredServerView serverView;
private final SegmentHandoffNotifier handoffNotifier;
private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
@ -155,7 +151,7 @@ public class RealtimePlumber implements Plumber
ExecutorService queryExecutorService,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
FilteredServerView serverView,
SegmentHandoffNotifier handoffNotifier,
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
@ -173,14 +169,14 @@ public class RealtimePlumber implements Plumber
this.queryExecutorService = queryExecutorService;
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.handoffNotifier = handoffNotifier;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.objectMapper = objectMapper;
if(!cache.isLocal()) {
if (!cache.isLocal()) {
log.error("Configured cache is not local, caching will not be enabled");
}
@ -212,8 +208,8 @@ public class RealtimePlumber implements Plumber
{
computeBaseDir(schema).mkdirs();
initializeExecutors();
handoffNotifier.start();
Object retVal = bootstrapSinksFromDisk();
registerServerViewCallback();
startPersistThread();
// Push pending sinks bootstrapped from previous run
mergeAndPush();
@ -258,17 +254,8 @@ public class RealtimePlumber implements Plumber
);
retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval));
addSink(retVal);
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal);
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));
}
catch (IOException e) {
log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource())
.addData("interval", retVal.getInterval())
.emit();
}
}
return retVal;
@ -556,7 +543,7 @@ public class RealtimePlumber implements Plumber
mergedTarget,
config.getIndexSpec()
);
// emit merge metrics before publishing segment
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));
@ -597,6 +584,17 @@ public class RealtimePlumber implements Plumber
}
}
);
handoffNotifier.registerSegmentHandoffCallback(
new SegmentDescriptor(sink.getInterval(), sink.getVersion(), config.getShardSpec().getPartitionNum()),
mergeExecutor, new Runnable()
{
@Override
public void run()
{
abandonSegment(sink.getInterval().getStartMillis(), sink);
}
}
);
}
@Override
@ -640,6 +638,7 @@ public class RealtimePlumber implements Plumber
}
}
handoffNotifier.stop();
shutdownExecutors();
stopped = true;
@ -678,11 +677,11 @@ public class RealtimePlumber implements Plumber
protected void shutdownExecutors()
{
// scheduledExecutor is shutdown here, but mergeExecutor is shutdown when the
// ServerView sends it a new segment callback
// scheduledExecutor is shutdown here
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
persistExecutor.shutdown();
mergeExecutor.shutdown();
}
}
@ -703,7 +702,7 @@ public class RealtimePlumber implements Plumber
Object metadata = null;
long latestCommitTime = 0;
for (File sinkDir : files) {
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
final Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
//final File[] sinkFiles = sinkDir.listFiles();
// To avoid reading and listing of "merged" dir
@ -735,97 +734,101 @@ public class RealtimePlumber implements Plumber
}
);
boolean isCorrupted = false;
try {
List<FireHydrant> hydrants = Lists.newArrayList();
for (File segmentDir : sinkFiles) {
log.info("Loading previously persisted segment at [%s]", segmentDir);
List<FireHydrant> hydrants = Lists.newArrayList();
for (File segmentDir : sinkFiles) {
log.info("Loading previously persisted segment at [%s]", segmentDir);
// Although this has been tackled at start of this method.
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
// If 100% sure that this is not needed, this check can be removed.
if (Ints.tryParse(segmentDir.getName()) == null) {
continue;
}
QueryableIndex queryableIndex = null;
try {
queryableIndex = indexIO.loadIndex(segmentDir);
}
catch (IOException e) {
log.error(e, "Problem loading segmentDir from disk.");
isCorrupted = true;
}
if (isCorrupted) {
try {
File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema);
log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath());
FileUtils.copyDirectory(segmentDir, corruptSegmentDir);
FileUtils.deleteDirectory(segmentDir);
}
catch (Exception e1) {
log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath());
}
//Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed
//at some point.
continue;
}
Map<String, Object> segmentMetadata = queryableIndex.getMetaData();
if (segmentMetadata != null) {
Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY);
if (timestampObj != null) {
long timestamp = ((Long) timestampObj).longValue();
if (timestamp > latestCommitTime) {
log.info(
"Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]",
queryableIndex.getMetaData(), timestamp, latestCommitTime
);
latestCommitTime = timestamp;
metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY);
}
}
}
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(
DataSegment.makeDataSegmentIdentifier(
schema.getDataSource(),
sinkInterval.getStart(),
sinkInterval.getEnd(),
versioningPolicy.getVersion(sinkInterval),
config.getShardSpec()
),
queryableIndex
),
Integer.parseInt(segmentDir.getName())
)
);
}
if (hydrants.isEmpty()) {
// Probably encountered a corrupt sink directory
log.warn(
"Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.",
sinkDir.getAbsolutePath()
);
// Although this has been tackled at start of this method.
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
// If 100% sure that this is not needed, this check can be removed.
if (Ints.tryParse(segmentDir.getName()) == null) {
continue;
}
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),
currSink.getVersion(),
new SingleElementPartitionChunk<Sink>(currSink)
QueryableIndex queryableIndex = null;
try {
queryableIndex = indexIO.loadIndex(segmentDir);
}
catch (IOException e) {
log.error(e, "Problem loading segmentDir from disk.");
isCorrupted = true;
}
if (isCorrupted) {
try {
File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema);
log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath());
FileUtils.copyDirectory(segmentDir, corruptSegmentDir);
FileUtils.deleteDirectory(segmentDir);
}
catch (Exception e1) {
log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath());
}
//Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed
//at some point.
continue;
}
Map<String, Object> segmentMetadata = queryableIndex.getMetaData();
if (segmentMetadata != null) {
Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY);
if (timestampObj != null) {
long timestamp = ((Long) timestampObj).longValue();
if (timestamp > latestCommitTime) {
log.info(
"Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]",
queryableIndex.getMetaData(), timestamp, latestCommitTime
);
latestCommitTime = timestamp;
metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY);
}
}
}
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(
DataSegment.makeDataSegmentIdentifier(
schema.getDataSource(),
sinkInterval.getStart(),
sinkInterval.getEnd(),
versioningPolicy.getVersion(sinkInterval),
config.getShardSpec()
),
queryableIndex
),
Integer.parseInt(segmentDir.getName())
)
);
segmentAnnouncer.announceSegment(currSink.getSegment());
}
catch (IOException e) {
log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource())
.addData("interval", sinkInterval)
.emit();
if (hydrants.isEmpty()) {
// Probably encountered a corrupt sink directory
log.warn(
"Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.",
sinkDir.getAbsolutePath()
);
continue;
}
final Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
addSink(currSink);
}
return metadata;
}
private void addSink(final Sink sink)
{
sinks.put(sink.getInterval().getStartMillis(), sink);
sinkTimeline.add(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<Sink>(sink)
);
try {
segmentAnnouncer.announceSegment(sink.getSegment());
}
catch (IOException e) {
log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
protected void startPersistThread()
{
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
@ -939,28 +942,30 @@ public class RealtimePlumber implements Plumber
*/
protected void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<>(sink)
);
for (FireHydrant hydrant : sink) {
cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment()));
if (sinks.containsKey(truncatedTime)) {
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<>(sink)
);
for (FireHydrant hydrant : sink) {
cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment()));
}
synchronized (handoffCondition) {
handoffCondition.notifyAll();
}
}
synchronized (handoffCondition) {
handoffCondition.notifyAll();
catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
protected File computeBaseDir(DataSchema schema)
@ -1044,72 +1049,6 @@ public class RealtimePlumber implements Plumber
}
}
private void registerServerViewCallback()
{
serverView.registerSegmentCallback(
mergeExecutor,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
if (stopped) {
log.info("Unregistering ServerViewCallback");
mergeExecutor.shutdown();
return ServerView.CallbackAction.UNREGISTER;
}
if (!server.isAssignable()) {
return ServerView.CallbackAction.CONTINUE;
}
log.debug("Checking segment[%s] on server[%s]", segment, server);
if (schema.getDataSource().equals(segment.getDataSource())
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
) {
final Interval interval = segment.getInterval();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long sinkKey = entry.getKey();
if (interval.contains(sinkKey)) {
final Sink sink = entry.getValue();
log.info("Segment[%s] matches sink[%s] on server[%s]", segment, sink, server);
final String segmentVersion = segment.getVersion();
final String sinkVersion = sink.getSegment().getVersion();
if (segmentVersion.compareTo(sinkVersion) >= 0) {
log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion);
abandonSegment(sinkKey, sink);
}
}
}
}
return ServerView.CallbackAction.CONTINUE;
}
},
new Predicate<DataSegment>()
{
@Override
public boolean apply(final DataSegment segment)
{
return
schema.getDataSource().equalsIgnoreCase(segment.getDataSource())
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
sinks.keySet(), new Predicate<Long>()
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
);
}
}
);
}
private void removeSegment(final Sink sink, final File target)
{
if (target.exists()) {

View File

@ -49,7 +49,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final DataSegmentPusher dataSegmentPusher;
private final DataSegmentAnnouncer segmentAnnouncer;
private final SegmentPublisher segmentPublisher;
private final FilteredServerView serverView;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexIO indexIO;
@ -64,21 +64,21 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject DataSegmentPusher dataSegmentPusher,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject FilteredServerView serverView,
@JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@JacksonInject ObjectMapper objectMapper
)
)
{
this.emitter = emitter;
this.conglomerate = conglomerate;
this.dataSegmentPusher = dataSegmentPusher;
this.segmentAnnouncer = segmentAnnouncer;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryExecutorService = executorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@ -107,7 +107,7 @@ public class RealtimePlumberSchool implements PlumberSchool
queryExecutorService,
dataSegmentPusher,
segmentPublisher,
serverView,
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
indexMerger,
indexIO,
cache,
@ -122,7 +122,7 @@ public class RealtimePlumberSchool implements PlumberSchool
Preconditions.checkNotNull(dataSegmentPusher, "must specify a segmentPusher to do this action.");
Preconditions.checkNotNull(segmentAnnouncer, "must specify a segmentAnnouncer to do this action.");
Preconditions.checkNotNull(segmentPublisher, "must specify a segmentPublisher to do this action.");
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
Preconditions.checkNotNull(handoffNotifierFactory, "must specify a handoffNotifierFactory to do this action.");
Preconditions.checkNotNull(emitter, "must specify a serviceEmitter to do this action.");
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import io.druid.query.SegmentDescriptor;
import java.util.concurrent.Executor;
public interface SegmentHandoffNotifier
{
/**
* register a handOffCallback to be called when segment handoff is complete.
*
* @param descriptor segment descriptor for the segment for which handoffCallback is requested
* @param exec executor used to call the runnable
* @param handOffRunnable runnable to be called when segment handoff is complete
*/
boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor,
Executor exec,
Runnable handOffRunnable
);
/**
* Perform any initial setup. Should be called before using any other methods, and should be paired
* with a corresponding call to {@link #stop()}.
*/
void start();
/**
* Perform any final processing and clean up after ourselves.
*/
void stop();
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
public interface SegmentHandoffNotifierFactory
{
SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource);
}

View File

@ -29,12 +29,20 @@ import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.client.CoordinatorServerView;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.SegmentLoadInfo;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.query.TableDataSource;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -60,13 +68,15 @@ import java.util.Set;
@Path("/druid/coordinator/v1/datasources")
public class DatasourcesResource
{
private final InventoryView serverInventoryView;
private static final Logger log = new Logger(DatasourcesResource.class);
private final CoordinatorServerView serverInventoryView;
private final MetadataSegmentManager databaseSegmentManager;
private final IndexingServiceClient indexingServiceClient;
@Inject
public DatasourcesResource(
InventoryView serverInventoryView,
CoordinatorServerView serverInventoryView,
MetadataSegmentManager databaseSegmentManager,
@Nullable IndexingServiceClient indexingServiceClient
)
@ -556,4 +566,55 @@ public class DatasourcesResource
segments.put("maxTime", new DateTime(maxTime));
return retVal;
}
/**
* Provides serverView for a datasource and Interval which gives details about servers hosting segments for an interval
* Used by the realtime tasks to fetch a view of the interval they are interested in.
*/
@GET
@Path("/{dataSourceName}/intervals/{interval}/serverview")
@Produces(MediaType.APPLICATION_JSON)
public Response getSegmentDataSourceSpecificInterval(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("interval") String interval,
@QueryParam("partial") final boolean partial
)
{
TimelineLookup<String, SegmentLoadInfo> timeline = serverInventoryView.getTimeline(
new TableDataSource(dataSourceName)
);
final Interval theInterval = new Interval(interval.replace("_", "/"));
if (timeline == null) {
log.debug("No timeline found for datasource[%s]", dataSourceName);
return Response.ok(Lists.<ImmutableSegmentLoadInfo>newArrayList()).build();
}
Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup = timeline.lookupWithIncompletePartitions(theInterval);
FunctionalIterable<ImmutableSegmentLoadInfo> retval = FunctionalIterable
.create(lookup).transformCat(
new Function<TimelineObjectHolder<String, SegmentLoadInfo>, Iterable<ImmutableSegmentLoadInfo>>()
{
@Override
public Iterable<ImmutableSegmentLoadInfo> apply(
TimelineObjectHolder<String, SegmentLoadInfo> input
)
{
return Iterables.transform(
input.getObject(),
new Function<PartitionChunk<SegmentLoadInfo>, ImmutableSegmentLoadInfo>()
{
@Override
public ImmutableSegmentLoadInfo apply(
PartitionChunk<SegmentLoadInfo> chunk
)
{
return chunk.getObject().toImmutableSegmentLoadInfo();
}
}
);
}
}
);
return Response.ok(retval).build();
}
}

View File

@ -0,0 +1,395 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import io.druid.curator.CuratorTestBase;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.TableDataSource;
import io.druid.segment.Segment;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionHolder;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
public class CoordinatorServerViewTest extends CuratorTestBase
{
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
private final String announcementsPath;
private final String inventoryPath;
private CountDownLatch segmentViewInitLatch;
private CountDownLatch segmentAddedLatch;
private CountDownLatch segmentRemovedLatch;
private ServerInventoryView baseView;
private CoordinatorServerView overlordServerView;
public CoordinatorServerViewTest()
{
jsonMapper = new DefaultObjectMapper();
zkPathsConfig = new ZkPathsConfig();
announcementsPath = zkPathsConfig.getAnnouncementsPath();
inventoryPath = zkPathsConfig.getLiveSegmentsPath();
}
@Before
public void setUp() throws Exception
{
setupServerAndCurator();
curator.start();
}
@Test
public void testSingleServerAddedRemovedSegment() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(1);
segmentRemovedLatch = new CountDownLatch(1);
setupViews();
final DruidServer druidServer = new DruidServer(
"localhost:1234",
"localhost:1234",
10000000L,
"historical",
"default_tier",
0
);
setupZNodeForServer(druidServer);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
announceSegmentForServer(druidServer, segment);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view"));
List<TimelineObjectHolder> serverLookupRes = (List<TimelineObjectHolder>) timeline.lookup(
new Interval(
"2014-10-20T00:00:00Z/P1D"
)
);
Assert.assertEquals(1, serverLookupRes.size());
TimelineObjectHolder<String, SegmentLoadInfo> actualTimelineObjectHolder = serverLookupRes.get(0);
Assert.assertEquals(new Interval("2014-10-20T00:00:00Z/P1D"), actualTimelineObjectHolder.getInterval());
Assert.assertEquals("v1", actualTimelineObjectHolder.getVersion());
PartitionHolder<SegmentLoadInfo> actualPartitionHolder = actualTimelineObjectHolder.getObject();
Assert.assertTrue(actualPartitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
Assert.assertFalse(segmentLoadInfo.isEmpty());
Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
unannounceSegmentForServer(druidServer, segment);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Assert.assertEquals(
0,
((List<TimelineObjectHolder>) timeline.lookup(new Interval("2014-10-20T00:00:00Z/P1D"))).size()
);
Assert.assertNull(timeline.findEntry(new Interval("2014-10-20T00:00:00Z/P1D"), "v1"));
}
@Test
public void testMultipleServerAddedRemovedSegment() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(5);
// temporarily set latch count to 1
segmentRemovedLatch = new CountDownLatch(1);
setupViews();
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.<String>of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
new Function<String, DruidServer>()
{
@Override
public DruidServer apply(String input)
{
return new DruidServer(
input,
input,
10000000L,
"historical",
"default_tier",
0
);
}
}
);
for (DruidServer druidServer : druidServers) {
setupZNodeForServer(druidServer);
}
final List<DataSegment> segments = Lists.transform(
ImmutableList.<Pair<String, String>>of(
Pair.of("2011-04-01/2011-04-03", "v1"),
Pair.of("2011-04-03/2011-04-06", "v1"),
Pair.of("2011-04-01/2011-04-09", "v2"),
Pair.of("2011-04-06/2011-04-09", "v3"),
Pair.of("2011-04-01/2011-04-02", "v3")
), new Function<Pair<String, String>, DataSegment>()
{
@Override
public DataSegment apply(Pair<String, String> input)
{
return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
}
}
);
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i));
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view"));
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
),
(List<TimelineObjectHolder>) timeline.lookup(
new Interval(
"2011-04-01/2011-04-09"
)
)
);
// unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2")
unannounceSegmentForServer(druidServers.get(2), segments.get(2));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
// renew segmentRemovedLatch since we still have 4 segments to unannounce
segmentRemovedLatch = new CountDownLatch(4);
timeline = overlordServerView.getTimeline(new TableDataSource("test_overlord_server_view"));
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
createExpected("2011-04-02/2011-04-03", "v1", druidServers.get(0), segments.get(0)),
createExpected("2011-04-03/2011-04-06", "v1", druidServers.get(1), segments.get(1)),
createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
),
(List<TimelineObjectHolder>) timeline.lookup(
new Interval(
"2011-04-01/2011-04-09"
)
)
);
// unannounce all the segments
for (int i = 0; i < 5; ++i) {
// skip the one that was previously unannounced
if (i != 2) {
unannounceSegmentForServer(druidServers.get(i), segments.get(i));
}
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Assert.assertEquals(
0,
((List<TimelineObjectHolder>) timeline.lookup(new Interval("2011-04-01/2011-04-09"))).size()
);
}
private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}
private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.delete().guaranteed().forPath(
ZKPaths.makePath(
ZKPaths.makePath(inventoryPath, druidServer.getHost()),
segment.getIdentifier()
)
);
}
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
String version,
DruidServer druidServer,
DataSegment segment
)
{
return Pair.of(new Interval(intervalStr), Pair.of(version, Pair.of(druidServer, segment)));
}
private void assertValues(
List<Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>> expected, List<TimelineObjectHolder> actual
)
{
Assert.assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); ++i) {
Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
TimelineObjectHolder<String,SegmentLoadInfo> actualTimelineObjectHolder = actual.get(i);
Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval());
Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion());
PartitionHolder<SegmentLoadInfo> actualPartitionHolder = actualTimelineObjectHolder.getObject();
Assert.assertTrue(actualPartitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(actualPartitionHolder));
SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
Assert.assertFalse(segmentLoadInfo.isEmpty());
Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
}
}
private void setupViews() throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
curator,
jsonMapper,
Predicates.<DataSegment>alwaysTrue()
)
{
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
{
super.registerSegmentCallback(
exec, new SegmentCallback()
{
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
CallbackAction res = callback.segmentAdded(server, segment);
segmentAddedLatch.countDown();
return res;
}
@Override
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
CallbackAction res = callback.segmentRemoved(server, segment);
segmentRemovedLatch.countDown();
return res;
}
@Override
public CallbackAction segmentViewInitialized()
{
CallbackAction res = callback.segmentViewInitialized();
segmentViewInitLatch.countDown();
return res;
}
}
);
}
};
overlordServerView = new CoordinatorServerView(
baseView
);
baseView.start();
}
private void setupZNodeForServer(DruidServer server) throws Exception
{
curator.create()
.creatingParentsIfNeeded()
.forPath(
ZKPaths.makePath(announcementsPath, server.getHost()),
jsonMapper.writeValueAsBytes(server.getMetadata())
);
curator.create()
.creatingParentsIfNeeded()
.forPath(ZKPaths.makePath(inventoryPath, server.getHost()));
}
private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version)
{
return DataSegment.builder()
.dataSource("test_overlord_server_view")
.interval(new Interval(intervalStr))
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
"somewhere"
)
)
.version(version)
.dimensions(ImmutableList.<String>of())
.metrics(ImmutableList.<String>of())
.shardSpec(new NoneShardSpec())
.binaryVersion(9)
.size(0)
.build();
}
@After
public void tearDown() throws Exception
{
baseView.stop();
tearDownServerAndCurator();
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.SegmentLoadInfo;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
import java.io.IOException;
public class ImmutableSegmentLoadInfoTest
{
private final ObjectMapper mapper = new DefaultObjectMapper();
@Test
public void testSerde() throws IOException
{
ImmutableSegmentLoadInfo segmentLoadInfo = new ImmutableSegmentLoadInfo(
new DataSegment(
"test_ds",
new Interval(
"2011-04-01/2011-04-02"
),
"v1",
null,
null,
null,
new NoneShardSpec(),
0, 0
), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1))
);
ImmutableSegmentLoadInfo serde = mapper.readValue(
mapper.writeValueAsBytes(segmentLoadInfo),
ImmutableSegmentLoadInfo.class
);
Assert.assertEquals(segmentLoadInfo, serde);
}
}

View File

@ -0,0 +1,357 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.SegmentLoadInfo;
import io.druid.client.coordinator.CoordinatorClient;
import io.druid.query.SegmentDescriptor;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class CoordinatorBasedSegmentHandoffNotifierTest
{
private final CoordinatorBasedSegmentHandoffNotifierConfig notifierConfig = new CoordinatorBasedSegmentHandoffNotifierConfig()
{
@Override
public Duration getPollDuration()
{
return Duration.millis(10);
}
};
@Test
public void testHandoffCallbackNotCalled() throws IOException, InterruptedException
{
Interval interval = new Interval(
"2011-04-01/2011-04-02"
);
SegmentDescriptor descriptor = new SegmentDescriptor(
interval, "v1", 2
);
DataSegment segment = new DataSegment(
"test_ds",
interval,
"v1",
null,
null,
null,
new NumberedShardSpec(2, 3),
0, 0
);
CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
.andReturn(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
segment,
Sets.newHashSet(
createRealtimeServerMetadata("a1")
)
)
)
).anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
"test_ds",
coordinatorClient,
notifierConfig
);
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
notifier.registerSegmentHandoffCallback(
descriptor, MoreExecutors.sameThreadExecutor(), new Runnable()
{
@Override
public void run()
{
callbackCalled.set(true);
}
}
);
notifier.checkForSegmentHandoffs();
// callback should have registered
Assert.assertEquals(1, notifier.getHandOffCallbacks().size());
Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor));
Assert.assertFalse(callbackCalled.get());
EasyMock.verify(coordinatorClient);
}
@Test
public void testHandoffCallbackCalled() throws IOException, InterruptedException
{
Interval interval = new Interval(
"2011-04-01/2011-04-02"
);
SegmentDescriptor descriptor = new SegmentDescriptor(
interval, "v1", 2
);
DataSegment segment = new DataSegment(
"test_ds",
interval,
"v1",
null,
null,
null,
new NumberedShardSpec(2, 3),
0, 0
);
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
.andReturn(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
segment,
Sets.newHashSet(
createHistoricalServerMetadata("a1")
)
)
)
).anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
"test_ds",
coordinatorClient,
notifierConfig
);
notifier.registerSegmentHandoffCallback(
descriptor, MoreExecutors.sameThreadExecutor(), new Runnable()
{
@Override
public void run()
{
callbackCalled.set(true);
}
}
);
Assert.assertEquals(1, notifier.getHandOffCallbacks().size());
Assert.assertTrue(notifier.getHandOffCallbacks().containsKey(descriptor));
notifier.checkForSegmentHandoffs();
// callback should have been removed
Assert.assertTrue(notifier.getHandOffCallbacks().isEmpty());
Assert.assertTrue(callbackCalled.get());
EasyMock.verify(coordinatorClient);
}
@Test
public void testHandoffChecksForVersion()
{
Interval interval = new Interval(
"2011-04-01/2011-04-02"
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v2", 2)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v2", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForAssignableServer()
{
Interval interval = new Interval(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createRealtimeServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForPartitionNumber()
{
Interval interval = new Interval(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 1)
)
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForInterval()
{
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(
new Interval(
"2011-04-01/2011-04-02"
), "v1", 1
),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(
new Interval(
"2011-04-01/2011-04-03"
), "v1", 1
)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Lists.newArrayList(
new ImmutableSegmentLoadInfo(
createSegment(
new Interval(
"2011-04-01/2011-04-04"
), "v1", 1
),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(
new Interval(
"2011-04-02/2011-04-03"
), "v1", 1
)
)
);
}
private DruidServerMetadata createRealtimeServerMetadata(String name)
{
return createServerMetadata(name, "realtime");
}
private DruidServerMetadata createHistoricalServerMetadata(String name)
{
return createServerMetadata(name, "historical");
}
private DruidServerMetadata createServerMetadata(String name, String type)
{
return new DruidServerMetadata(
name,
name,
10000,
type,
"tier",
1
);
}
private DataSegment createSegment(Interval interval, String version, int partitionNumber)
{
return new DataSegment(
"test_ds",
interval,
version,
null,
null,
null,
new NumberedShardSpec(partitionNumber, 100),
0, 0
);
}
}

View File

@ -43,6 +43,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.TestHelper;
@ -89,7 +90,8 @@ public class RealtimePlumberSchoolTest
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
private FilteredServerView serverView;
private SegmentHandoffNotifier handoffNotifier;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
private ServiceEmitter emitter;
private RealtimeTuningConfig tuningConfig;
private DataSchema schema;
@ -162,17 +164,20 @@ public class RealtimePlumberSchoolTest
segmentPublisher = EasyMock.createNiceMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(FilteredServerView.class);
serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject(),
EasyMock.<Predicate<DataSegment>>anyObject()
);
EasyMock.expectLastCall().anyTimes();
handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class);
EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(handoffNotifier).anyTimes();
EasyMock.expect(
handoffNotifier.registerSegmentHandoffCallback(
EasyMock.<SegmentDescriptor>anyObject(),
EasyMock.<Executor>anyObject(),
EasyMock.<Runnable>anyObject()
)
).andReturn(true).anyTimes();
emitter = EasyMock.createMock(ServiceEmitter.class);
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
tuningConfig = new RealtimeTuningConfig(
1,
@ -192,7 +197,7 @@ public class RealtimePlumberSchoolTest
dataSegmentPusher,
announcer,
segmentPublisher,
serverView,
handoffNotifierFactory,
MoreExecutors.sameThreadExecutor(),
TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexIO(),
@ -208,7 +213,7 @@ public class RealtimePlumberSchoolTest
@After
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher,handoffNotifierFactory, handoffNotifier, emitter);
FileUtils.deleteDirectory(
new File(
tuningConfig.getBasePersistDirectory(),
@ -335,15 +340,15 @@ public class RealtimePlumberSchoolTest
RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
plumber2.getSinks()
.put(
0L,
new Sink(
testInterval,
schema2,
tuningConfig,
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
.put(
0L,
new Sink(
testInterval,
schema2,
tuningConfig,
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
Assert.assertNull(plumber2.startJob());
final Committer committer = new Committer()
@ -377,14 +382,18 @@ public class RealtimePlumberSchoolTest
File persistDir = plumber2.computePersistDir(schema2, testInterval);
/* Check that all hydrants were persisted */
for (int i = 0; i < 5; i ++) {
for (int i = 0; i < 5; i++) {
Assert.assertTrue(new File(persistDir, String.valueOf(i)).exists());
}
/* Create some gaps in the persisted hydrants and reload */
FileUtils.deleteDirectory(new File(persistDir, "1"));
FileUtils.deleteDirectory(new File(persistDir, "3"));
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(
schema2,
tuningConfig,
metrics
);
restoredPlumber.bootstrapSinksFromDisk();
Map<Long, Sink> sinks = restoredPlumber.getSinks();
@ -393,26 +402,37 @@ public class RealtimePlumberSchoolTest
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(new Long(0)));
DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z");
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")),
hydrants.get(0).getSegment().getDataInterval());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")),
hydrants.get(0).getSegment().getDataInterval()
);
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")),
hydrants.get(1).getSegment().getDataInterval());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")),
hydrants.get(1).getSegment().getDataInterval()
);
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")),
hydrants.get(2).getSegment().getDataInterval());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")),
hydrants.get(2).getSegment().getDataInterval()
);
/* Delete all the hydrants and reload, no sink should be created */
FileUtils.deleteDirectory(new File(persistDir, "0"));
FileUtils.deleteDirectory(new File(persistDir, "2"));
FileUtils.deleteDirectory(new File(persistDir, "4"));
RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(
schema2,
tuningConfig,
metrics
);
restoredPlumber2.bootstrapSinksFromDisk();
Assert.assertEquals(0, restoredPlumber2.getSinks().size());
}
private InputRow getTestInputRow(final String timeStr) {
private InputRow getTestInputRow(final String timeStr)
{
return new InputRow()
{
@Override

View File

@ -21,6 +21,7 @@ package io.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.druid.client.CoordinatorServerView;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
@ -42,7 +43,7 @@ import java.util.TreeSet;
public class DatasourcesResourceTest
{
private InventoryView inventoryView;
private CoordinatorServerView inventoryView;
private DruidServer server;
private List<DruidDataSource> listDataSources;
private List<DataSegment> dataSegmentList;
@ -50,7 +51,7 @@ public class DatasourcesResourceTest
@Before
public void setUp()
{
inventoryView = EasyMock.createStrictMock(InventoryView.class);
inventoryView = EasyMock.createStrictMock(CoordinatorServerView.class);
server = EasyMock.createStrictMock(DruidServer.class);
dataSegmentList = new ArrayList<>();
dataSegmentList.add(

View File

@ -29,6 +29,7 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import io.airlift.airline.Command;
import io.druid.audit.AuditManager;
import io.druid.client.CoordinatorServerView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
@ -119,6 +120,7 @@ public class CliCoordinator extends ServerRunnable
.in(ManageLifecycle.class);
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(CoordinatorServerView.class).in(LazySingleton.class);
binder.bind(DruidCoordinator.class);
@ -138,6 +140,8 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, IntervalsResource.class);
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DatasourcesResource.class);
}
@Provides

View File

@ -33,6 +33,7 @@ import io.airlift.airline.Arguments;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.druid.client.cache.CacheConfig;
import io.druid.client.coordinator.CoordinatorClient;
import io.druid.guice.Binders;
import io.druid.guice.CacheModule;
import io.druid.guice.IndexingServiceFirehoseModule;
@ -73,6 +74,9 @@ import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.server.initialization.jetty.JettyServerInitializer;
@ -130,7 +134,7 @@ public class CliPeon extends GuiceRunnable
handlerProviderBinder.addBinding("noop")
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
@ -164,12 +168,22 @@ public class CliPeon extends GuiceRunnable
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
JsonConfigProvider.bind(
binder,
"druid.segment.handoff",
CoordinatorBasedSegmentHandoffNotifierConfig.class
);
binder.bind(SegmentHandoffNotifierFactory.class)
.to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
.in(LazySingleton.class);
// Override the default SegmentLoaderConfig because we don't actually care about the
// configuration based locations. This will override them anyway. This is also stopping
// configuration of other parameters, but I don't think that's actually a problem.
// Note, if that is actually not a problem, then that probably means we have the wrong abstraction.
binder.bind(SegmentLoaderConfig.class)
.toInstance(new SegmentLoaderConfig().withLocations(Arrays.<StorageLocationConfig>asList()));
binder.bind(CoordinatorClient.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, QueryResource.class);

View File

@ -26,6 +26,7 @@ import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.client.cache.CacheConfig;
import io.druid.client.coordinator.CoordinatorClient;
import io.druid.metadata.MetadataSegmentPublisher;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.FireDepartment;
@ -36,6 +37,9 @@ import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
@ -79,10 +83,20 @@ public class RealtimeModule implements Module
.to(NoopChatHandlerProvider.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);
binder.bind(new TypeLiteral<List<FireDepartment>>(){})
binder.bind(
new TypeLiteral<List<FireDepartment>>()
{
}
)
.toProvider(FireDepartmentsProvider.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.segment.handoff", CoordinatorBasedSegmentHandoffNotifierConfig.class);
binder.bind(SegmentHandoffNotifierFactory.class)
.to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
.in(LazySingleton.class);
binder.bind(CoordinatorClient.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());