Optional long-polling based segment announcement via HTTP instead of Zookeeper (#3902)

* Optional long-polling based segment announcement via HTTP instead of Zookeeper

* address review comments

* make endpoint /druid-internal/v1 instead of /druid/internal so that jetty qos filters can be configured easily when needed

* update segment callback initialization to be called only after first segment list fetch has been succeeded from all servers

* address review comments

* remove size check not required anymore as only segment servers announce themselves and not all peon processes

* annouce segment server on historical only after cached segments are loaded

* fix checkstyle errors
This commit is contained in:
Himanshu 2017-05-17 16:31:58 -05:00 committed by GitHub
parent 0e056863e4
commit daa8ef8658
55 changed files with 2768 additions and 516 deletions

View File

@ -286,6 +286,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer() final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) { ) {
toolbox.getDataSegmentServerAnnouncer().announce();
appenderator = appenderator0; appenderator = appenderator0;
final String topic = ioConfig.getStartPartitions().getTopic(); final String topic = ioConfig.getStartPartitions().getTopic();
@ -567,6 +569,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
} }
} }
toolbox.getDataSegmentServerAnnouncer().unannounce();
return success(); return success();
} }

View File

@ -112,6 +112,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
@ -1522,6 +1523,7 @@ public class KafkaIndexTaskTest
null, // DataSegmentMover null, // DataSegmentMover
null, // DataSegmentArchiver null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(), new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory, handoffNotifierFactory,
makeTimeseriesOnlyConglomerate(), makeTimeseriesOnlyConglomerate(),
MoreExecutors.sameThreadExecutor(), // queryExecutorService MoreExecutors.sameThreadExecutor(), // queryExecutorService

View File

@ -46,6 +46,7 @@ import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -70,6 +71,7 @@ public class TaskToolbox
private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover; private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler; private final MonitorScheduler monitorScheduler;
@ -93,6 +95,7 @@ public class TaskToolbox
DataSegmentMover dataSegmentMover, DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver, DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory, SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService, ExecutorService queryExecutorService,
@ -116,6 +119,7 @@ public class TaskToolbox
this.dataSegmentMover = dataSegmentMover; this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver; this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory; this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService; this.queryExecutorService = queryExecutorService;
@ -170,6 +174,11 @@ public class TaskToolbox
return segmentAnnouncer; return segmentAnnouncer;
} }
public DataSegmentServerAnnouncer getDataSegmentServerAnnouncer()
{
return serverAnnouncer;
}
public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory() public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory()
{ {
return handoffNotifierFactory; return handoffNotifierFactory;

View File

@ -40,6 +40,7 @@ import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import java.io.File; import java.io.File;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -57,6 +58,7 @@ public class TaskToolboxFactory
private final DataSegmentMover dataSegmentMover; private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
@ -79,6 +81,7 @@ public class TaskToolboxFactory
DataSegmentMover dataSegmentMover, DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver, DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory, SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService, @Processing ExecutorService queryExecutorService,
@ -100,6 +103,7 @@ public class TaskToolboxFactory
this.dataSegmentMover = dataSegmentMover; this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver; this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory; this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService; this.queryExecutorService = queryExecutorService;
@ -126,6 +130,7 @@ public class TaskToolboxFactory
dataSegmentMover, dataSegmentMover,
dataSegmentArchiver, dataSegmentArchiver,
segmentAnnouncer, segmentAnnouncer,
serverAnnouncer,
handoffNotifierFactory, handoffNotifierFactory,
queryRunnerFactoryConglomerate, queryRunnerFactoryConglomerate,
queryExecutorService, queryExecutorService,

View File

@ -248,12 +248,6 @@ public class RealtimeIndexTask extends AbstractTask
} }
} }
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return toolbox.getSegmentAnnouncer().isAnnounced(segment);
}
}; };
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
@ -326,6 +320,8 @@ public class RealtimeIndexTask extends AbstractTask
Supplier<Committer> committerSupplier = null; Supplier<Committer> committerSupplier = null;
try { try {
toolbox.getDataSegmentServerAnnouncer().announce();
plumber.startJob(); plumber.startJob();
// Set up metrics emission // Set up metrics emission
@ -425,6 +421,8 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
} }
} }
toolbox.getDataSegmentServerAnnouncer().unannounce();
} }
log.info("Job done!"); log.info("Job done!");

View File

@ -41,6 +41,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -102,6 +103,7 @@ public class TaskToolboxTest
mockDataSegmentMover, mockDataSegmentMover,
mockDataSegmentArchiver, mockDataSegmentArchiver,
mockSegmentAnnouncer, mockSegmentAnnouncer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
mockHandoffNotifierFactory, mockHandoffNotifierFactory,
mockQueryRunnerFactoryConglomerate, mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService, mockQueryExecutorService,

View File

@ -550,7 +550,7 @@ public class IndexTaskTest
segments.add(segment); segments.add(segment);
return segment; return segment;
} }
}, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), }, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexMerger, indexIO, null, null, indexMergerV9 indexMerger, indexIO, null, null, indexMergerV9
) )
); );

View File

@ -101,6 +101,7 @@ import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
@ -1042,6 +1043,7 @@ public class RealtimeIndexTaskTest
null, // DataSegmentMover null, // DataSegmentMover
null, // DataSegmentArchiver null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(), new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory, handoffNotifierFactory,
conglomerate, conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService MoreExecutors.sameThreadExecutor(), // queryExecutorService

View File

@ -200,7 +200,7 @@ public class SameIntervalMergeTaskTest
segments.add(segment); segments.add(segment);
return segment; return segment;
} }
}, null, null, null, null, null, null, null, null, new SegmentLoader() }, null, null, null, null, null, null, null, null, null, new SegmentLoader()
{ {
@Override @Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException

View File

@ -270,6 +270,7 @@ public class IngestSegmentFirehoseFactoryTest
} }
}, },
null, // segment announcer null, // segment announcer
null,
notifierFactory, notifierFactory,
null, // query runner factory conglomerate corporation unionized collective null, // query runner factory conglomerate corporation unionized collective
null, // query executor service null, // query executor service

View File

@ -318,6 +318,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
null, // segment mover null, // segment mover
null, // segment archiver null, // segment archiver
null, // segment announcer, null, // segment announcer,
null,
notifierFactory, notifierFactory,
null, // query runner factory conglomerate corporation unionized collective null, // query runner factory conglomerate corporation unionized collective
null, // query executor service null, // query executor service

View File

@ -102,6 +102,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
@ -566,13 +567,8 @@ public class TaskLifecycleTest
{ {
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
}, // segment announcer }, // segment announcer
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory, handoffNotifierFactory,
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
MoreExecutors.sameThreadExecutor(), // query executor service MoreExecutors.sameThreadExecutor(), // query executor service

View File

@ -59,12 +59,6 @@ public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer
} }
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return announcedSegments.contains(segment);
}
public Set<DataSegment> getAnnouncedSegments() public Set<DataSegment> getAnnouncedSegments()
{ {
return ImmutableSet.copyOf(announcedSegments); return ImmutableSet.copyOf(announcedSegments);

View File

@ -131,4 +131,16 @@ public class TestServerView implements FilteredServerInventoryView, ServerView.S
{ {
return null; return null;
} }
@Override
public boolean isStarted()
{
return true;
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
return false;
}
} }

View File

@ -169,7 +169,7 @@ public class WorkerTaskMonitorTest
new TaskToolboxFactory( new TaskToolboxFactory(
taskConfig, taskConfig,
taskActionClientFactory, taskActionClientFactory,
null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager( new SegmentLoaderLocalCacheManager(
null, null,
new SegmentLoaderConfig() new SegmentLoaderConfig()

View File

@ -0,0 +1,365 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public abstract class AbstractCuratorServerInventoryView<InventoryType> implements ServerInventoryView
{
private final EmittingLogger log;
private final CuratorFramework curator;
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
public AbstractCuratorServerInventoryView(
final EmittingLogger log,
final String announcementsPath,
final String inventoryPath,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
)
{
this.log = log;
this.curator = curator;
this.inventoryManager = new CuratorInventoryManager<>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return announcementsPath;
}
@Override
public String getInventoryPath()
{
return inventoryPath;
}
},
Execs.singleThreaded("ServerInventoryView-%s"),
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public InventoryType deserializeInventory(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, typeReference);
}
catch (IOException e) {
CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray());
CharBuffer charBuffer = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes));
log.error(e, "Could not parse json: %s", charBuffer.toString());
throw Throwables.propagate(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
}
@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerCallbacks(deadContainer);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
return newContainer.addDataSegments(oldContainer);
}
@Override
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer updateInventory(
DruidServer container, String inventoryKey, InventoryType inventory
)
{
return updateInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
return removeInnerInventory(container, inventoryKey);
}
@Override
public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
);
}
}
);
}
@LifecycleStart
public void start() throws Exception
{
synchronized (started) {
if (!started.get()) {
inventoryManager.start();
started.set(true);
}
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (started) {
if (started.getAndSet(false)) {
inventoryManager.stop();
}
}
}
@Override
public boolean isStarted()
{
return started.get();
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
return inventoryManager.getInventoryValue(containerKey);
}
@Override
public Iterable<DruidServer> getInventory()
{
return inventoryManager.getInventory();
}
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverCallbacks.put(callback, exec);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
segmentCallbacks.put(callback, exec);
}
public InventoryManagerConfig getInventoryManagerConfig()
{
return inventoryManager.getConfig();
}
protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
}
);
}
}
protected void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverCallbacks.remove(entry.getKey());
}
}
}
);
}
}
protected void addSingleInventory(
final DruidServer container,
final DataSegment inventory
)
{
log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
if (container.getSegment(inventory.getIdentifier()) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventory.getIdentifier(),
container.getName()
);
return;
}
container.addDataSegment(inventory.getIdentifier(), inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container.getMetadata(), inventory);
}
}
);
}
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
{
log.debug("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return;
}
container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container.getMetadata(), segment);
}
}
);
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
try {
String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(getInventoryManagerConfig().getInventoryPath(), serverKey),
segment.getIdentifier()
);
return curator.checkExists().forPath(toServedSegPath) != null;
} catch (Exception ex) {
throw Throwables.propagate(ex);
}
}
protected abstract DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer updateInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer removeInnerInventory(
final DruidServer container,
String inventoryKey
);
protected abstract void segmentCallbackRemoved(SegmentCallback callback);
}

View File

@ -45,7 +45,7 @@ import java.util.concurrent.Executor;
/** /**
*/ */
@ManageLifecycle @ManageLifecycle
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>> public class BatchServerInventoryView extends AbstractCuratorServerInventoryView<Set<DataSegment>>
implements FilteredServerInventoryView implements FilteredServerInventoryView
{ {
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);

View File

@ -207,4 +207,16 @@ public class CoordinatorServerView implements InventoryView
{ {
return baseView.getInventory(); return baseView.getInventory();
} }
@Override
public boolean isStarted()
{
return baseView.isStarted();
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
return baseView.isSegmentLoadedByServer(serverKey, segment);
}
} }

View File

@ -226,6 +226,15 @@ public class DruidServer implements Comparable
return dataSources.values(); return dataSources.values();
} }
public void removeAllSegments()
{
synchronized (lock) {
dataSources.clear();
segments.clear();
currSize = 0;
}
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {

View File

@ -0,0 +1,174 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.ISE;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
/**
* Discovers DruidServer instances that serve segments using CuratorInventoryManager.
*/
public class DruidServerDiscovery
{
private final EmittingLogger log = new EmittingLogger(DruidServerDiscovery.class);
private final CuratorInventoryManager curatorInventoryManager;
private volatile Listener listener;
DruidServerDiscovery(
final CuratorFramework curatorFramework,
final String announcementsPath,
final ObjectMapper jsonMapper
)
{
curatorInventoryManager = initCuratorInventoryManager(curatorFramework, announcementsPath, jsonMapper);
}
public void start() throws Exception
{
Preconditions.checkNotNull(listener, "listener is not configured yet");
curatorInventoryManager.start();
}
public void stop() throws IOException
{
curatorInventoryManager.stop();
}
private CuratorInventoryManager initCuratorInventoryManager(
final CuratorFramework curator,
final String announcementsPath,
final ObjectMapper jsonMapper
)
{
return new CuratorInventoryManager<>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return announcementsPath;
}
@Override
public String getInventoryPath()
{
return "/NON_EXISTENT_DUMMY_INVENTORY_PATH";
}
},
Execs.singleThreaded("CuratorInventoryManagerBasedServerDiscovery-%s"),
new CuratorInventoryManagerStrategy<DruidServer, Object>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container.getName());
listener.serverAdded(container);
}
@Override
public void deadContainer(DruidServer container)
{
log.info("Server Disappeared[%s]", container.getName());
listener.serverRemoved(container);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
log.info("Server updated[%s]", oldContainer.getName());
return listener.serverUpdated(oldContainer, newContainer);
}
@Override
public Object deserializeInventory(byte[] bytes)
{
throw new ISE("no inventory should exist.");
}
@Override
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final Object inventory
)
{
throw new ISE("no inventory should exist.");
}
@Override
public DruidServer updateInventory(
DruidServer container, String inventoryKey, Object inventory
)
{
throw new ISE("no inventory should exist.");
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
throw new ISE("no inventory should exist.");
}
@Override
public void inventoryInitialized()
{
log.info("Server inventory initialized.");
listener.initialized();
}
}
);
}
public void registerListener(Listener listener)
{
Preconditions.checkArgument(this.listener == null, "listener registered already.");
this.listener = listener;
}
public interface Listener
{
void serverAdded(DruidServer server);
DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer);
void serverRemoved(DruidServer server);
void initialized();
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.http.client.HttpClient;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
/**
*/
public class FilteredHttpServerInventoryViewProvider implements FilteredServerInventoryViewProvider
{
@JacksonInject
@NotNull
@Client
HttpClient httpClient = null;
@JacksonInject
@NotNull
@Smile
ObjectMapper smileMapper = null;
@JacksonInject
@NotNull
@Json
ObjectMapper jsonMapper = null;
@JacksonInject
@NotNull
HttpServerInventoryViewConfig config = null;
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@Override
public HttpServerInventoryView get()
{
return new HttpServerInventoryView(
jsonMapper, smileMapper, httpClient,
new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper),
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue(),
config
);
}
}

View File

@ -27,7 +27,8 @@ import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class), @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class) @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "http", value = FilteredHttpServerInventoryViewProvider.class)
}) })
public interface FilteredServerInventoryViewProvider extends Provider<FilteredServerInventoryView> public interface FilteredServerInventoryViewProvider extends Provider<FilteredServerInventoryView>
{ {

View File

@ -0,0 +1,663 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.concurrent.LifecycleLock;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.SegmentChangeRequestHistory;
import io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import io.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* This class uses CuratorInventoryManager to listen for queryable server membership which serve segments(e.g. Historicals).
* For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs in SegmentListerResource.getSegments(..).
*/
public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
{
private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
private final DruidServerDiscovery serverDiscovery;
private final LifecycleLock lifecycleLock = new LifecycleLock();
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
.makeMap();
private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
private volatile Predicate<Pair<DruidServerMetadata, DataSegment>> finalPredicate;
// For each queryable server, a name -> DruidServerHolder entry is kept
private final Map<String, DruidServerHolder> servers = new HashMap<>();
private volatile ExecutorService executor;
// a queue of queryable server names for which worker threads in executor initiate the segment list call i.e.
// DruidServerHolder.updateSegmentsListAsync(..) which updates the segment list asynchronously and adds itself
// to this queue again for next update.
private final BlockingQueue<String> queue = new LinkedBlockingDeque<>();
private final HttpClient httpClient;
private final ObjectMapper smileMapper;
private final HttpServerInventoryViewConfig config;
@Inject
public HttpServerInventoryView(
final @Json ObjectMapper jsonMapper,
final @Smile ObjectMapper smileMapper,
final @Global HttpClient httpClient,
final DruidServerDiscovery serverDiscovery,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
final HttpServerInventoryViewConfig config
)
{
this.httpClient = httpClient;
this.smileMapper = smileMapper;
this.serverDiscovery = serverDiscovery;
this.defaultFilter = defaultFilter;
this.finalPredicate = defaultFilter;
this.config = config;
}
@LifecycleStart
public void start() throws Exception
{
synchronized (lifecycleLock) {
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
log.info("Starting HttpServerInventoryView.");
try {
executor = Executors.newFixedThreadPool(
config.getNumThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HttpServerInventoryView-%s").build()
);
executor.execute(
new Runnable()
{
@Override
public void run()
{
if (!lifecycleLock.awaitStarted()) {
log.error("WTF! lifecycle not started, segments will not be discovered.");
return;
}
while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
String name = queue.take();
synchronized (servers) {
DruidServerHolder holder = servers.get(name);
if (holder != null) {
holder.updateSegmentsListAsync();
}
}
}
catch (InterruptedException ex) {
log.info("main thread interrupted, served segments list is not synced anymore.");
Thread.currentThread().interrupt();
}
catch (Throwable th) {
log.makeAlert(th, "main thread ignored error").emit();
}
}
log.info("HttpServerInventoryView main thread exited.");
}
}
);
serverDiscovery.registerListener(
new DruidServerDiscovery.Listener()
{
@Override
public void serverAdded(DruidServer server)
{
serverAddedOrUpdated(server);
}
@Override
public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer)
{
return serverAddedOrUpdated(newServer);
}
@Override
public void serverRemoved(DruidServer server)
{
HttpServerInventoryView.this.serverRemoved(server);
runServerCallbacks(server);
}
@Override
public void initialized()
{
serverInventoryInitialized();
}
}
);
serverDiscovery.start();
log.info("Started HttpServerInventoryView.");
lifecycleLock.started();
} finally {
lifecycleLock.exitStart();
}
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (lifecycleLock) {
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
log.info("Stopping HttpServerInventoryView.");
serverDiscovery.stop();
if (executor != null) {
executor.shutdownNow();
executor = null;
}
queue.clear();
log.info("Stopped HttpServerInventoryView.");
}
}
@Override
public void registerSegmentCallback(
Executor exec, SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
segmentCallbacks.put(callback, exec);
segmentPredicates.put(callback, filter);
finalPredicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
);
}
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverCallbacks.put(callback, exec);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
segmentCallbacks.put(callback, exec);
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(containerKey);
if (holder != null) {
return holder.druidServer;
}
}
return null;
}
@Override
public Iterable<DruidServer> getInventory()
{
synchronized (servers) {
return Iterables.transform(
servers.values(), new com.google.common.base.Function<DruidServerHolder, DruidServer>()
{
@Override
public DruidServer apply(DruidServerHolder input)
{
return input.druidServer;
}
}
);
}
}
private void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbacks.remove(entry.getKey());
if (segmentPredicates.remove(entry.getKey()) != null) {
finalPredicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
);
}
}
}
}
);
}
}
private void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverCallbacks.remove(entry.getKey());
}
}
}
);
}
}
//best effort wait for first segment listing fetch from all servers and then call
//segmentViewInitialized on all registered segment callbacks.
private void serverInventoryInitialized()
{
for (DruidServerHolder server : servers.values()) {
server.awaitInitialization();
}
log.info("Calling SegmentCallback.segmentViewInitialized() for all callbacks.");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
);
}
private DruidServer serverAddedOrUpdated(DruidServer server)
{
DruidServerHolder curr;
DruidServerHolder newHolder;
synchronized (servers) {
curr = servers.get(server.getName());
newHolder = curr == null ? new DruidServerHolder(server) : curr.updatedHolder(server);
servers.put(server.getName(), newHolder);
}
newHolder.updateSegmentsListAsync();
return newHolder.druidServer;
}
private void serverRemoved(DruidServer server)
{
synchronized (servers) {
servers.remove(server.getName());
}
}
public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer)
{
return serverAddedOrUpdated(newServer);
}
@Override
public boolean isStarted()
{
return lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS);
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(serverKey);
if (holder != null) {
return holder.druidServer.getSegment(segment.getIdentifier()) != null;
} else {
return false;
}
}
}
private class DruidServerHolder
{
private final Object lock = new Object();
//lock is used to keep state in counter and and segment list in druidServer consistent
// so that in "updateHolder()" method, new DruidServerHolder with updated DruidServer info
// can be safely created
private final DruidServer druidServer;
private volatile SegmentChangeRequestHistory.Counter counter = null;
private final HostAndPort serverHostAndPort;
private final DataSegmentChangeHandler changeHandler;
private final long serverHttpTimeout = config.getServerTimeout() + 1000;
private final CountDownLatch initializationLatch = new CountDownLatch(1);
DruidServerHolder(DruidServer druidServer)
{
this(druidServer, null);
}
private DruidServerHolder(final DruidServer druidServer, final SegmentChangeRequestHistory.Counter counter)
{
this.druidServer = druidServer;
this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost());
this.counter = counter;
changeHandler = new DataSegmentChangeHandler()
{
@Override
public void addSegment(
final DataSegment segment, final DataSegmentChangeCallback callback
)
{
if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) {
druidServer.addDataSegment(segment.getIdentifier(), segment);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(druidServer.getMetadata(), segment);
}
}
);
}
}
@Override
public void removeSegment(
final DataSegment segment, final DataSegmentChangeCallback callback
)
{
druidServer.removeDataSegment(segment.getIdentifier());
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(druidServer.getMetadata(), segment);
}
}
);
}
};
}
//wait for first fetch of segment listing from server.
void awaitInitialization()
{
try {
if (!initializationLatch.await(serverHttpTimeout, TimeUnit.MILLISECONDS)) {
log.warn("Await initialization timed out for server [%s].", druidServer.getName());
}
} catch (InterruptedException ex) {
log.warn("Await initialization interrupted while waiting on server [%s].", druidServer.getName());
Thread.currentThread().interrupt();
}
}
DruidServerHolder updatedHolder(DruidServer server)
{
synchronized (lock) {
return new DruidServerHolder(server.addDataSegments(druidServer), counter) ;
}
}
Future<?> updateSegmentsListAsync()
{
try {
final String req;
if (counter != null) {
req = String.format(
"/druid-internal/v1/segments?counter=%s&hash=%s&timeout=%s",
counter.getCounter(),
counter.getHash(),
config.getServerTimeout()
);
} else {
req = String.format(
"/druid-internal/v1/segments?counter=-1&timeout=%s",
config.getServerTimeout()
);
}
URL url = new URL("http", serverHostAndPort.getHostText(), serverHostAndPort.getPort(), req);
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
log.debug("Sending segment list fetch request to [%s] on URL [%s]", druidServer.getName(), url);
ListenableFuture<InputStream> future = httpClient.go(
new Request(HttpMethod.GET, url)
.addHeader(
HttpHeaders.Names.ACCEPT,
SmileMediaTypes.APPLICATION_JACKSON_SMILE
)
.addHeader(HttpHeaders.Names.CONTENT_TYPE, SmileMediaTypes.APPLICATION_JACKSON_SMILE),
responseHandler,
new Duration(serverHttpTimeout)
);
log.debug("Sent segment list fetch request to [%s]", druidServer.getName());
Futures.addCallback(
future,
new FutureCallback<InputStream>()
{
@Override
public void onSuccess(InputStream stream)
{
try {
if (responseHandler.status == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT from [%s]", druidServer.getName());
return;
} else if (responseHandler.status != HttpServletResponse.SC_OK) {
onFailure(null);
return;
}
log.debug("Received segment list response from [%s]", druidServer.getName());
SegmentChangeRequestsSnapshot delta = smileMapper.readValue(
stream,
SegmentChangeRequestsSnapshot.class
);
log.debug("Finished reading segment list response from [%s]", druidServer.getName());
synchronized (lock) {
if (delta.isResetCounter()) {
log.debug(
"Server [%s] requested resetCounter for reason [%s].",
druidServer.getName(),
delta.getResetCause()
);
counter = null;
return;
}
if (counter == null) {
druidServer.removeAllSegments();
}
for (DataSegmentChangeRequest request : delta.getRequests()) {
request.go(changeHandler, null);
}
counter = delta.getCounter();
}
initializationLatch.countDown();
}
catch (Exception ex) {
log.error(ex, "error processing segment list response from server [%s]", druidServer.getName());
}
finally {
queue.add(druidServer.getName());
}
}
@Override
public void onFailure(Throwable t)
{
try {
if (t != null) {
log.error(
t,
"failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]",
druidServer.getName(),
responseHandler.status,
responseHandler.description
);
} else {
log.error(
"failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]",
druidServer.getName(),
responseHandler.status,
responseHandler.description
);
}
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
finally {
queue.add(druidServer.getName());
}
}
},
executor
);
return future;
} catch (Throwable th) {
queue.add(druidServer.getName());
log.makeAlert(th, "Fatal error while fetching segment list from server [%s].", druidServer.getName()).emit();
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
throw Throwables.propagate(th);
}
}
}
private static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler
{
private int status;
private String description;
@Override
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response)
{
status = response.getStatus().getCode();
description = response.getStatus().getReasonPhrase();
return ClientResponse.unfinished(super.handleResponse(response).getObj());
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Period;
/**
*/
public class HttpServerInventoryViewConfig
{
@JsonProperty
private final long serverTimeout;
@JsonProperty
private final int numThreads;
@JsonCreator
public HttpServerInventoryViewConfig(
@JsonProperty("serverTimeout") Period serverTimeout,
@JsonProperty("numThreads") Integer numThreads
){
this.serverTimeout = serverTimeout != null
? serverTimeout.toStandardDuration().getMillis()
: 4*60*1000; //4 mins
this.numThreads = numThreads != null ? numThreads.intValue() : 5;
Preconditions.checkArgument(this.serverTimeout > 0, "server timeout must be > 0 ms");
Preconditions.checkArgument(this.numThreads > 1, "numThreads must be > 1");
}
public long getServerTimeout()
{
return serverTimeout;
}
public int getNumThreads()
{
return numThreads;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.http.client.HttpClient;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
/**
*/
public class HttpServerInventoryViewProvider implements ServerInventoryViewProvider
{
@JacksonInject
@NotNull
@Client
HttpClient httpClient = null;
@JacksonInject
@NotNull
@Smile
ObjectMapper smileMapper = null;
@JacksonInject
@NotNull
@Json
ObjectMapper jsonMapper = null;
@JacksonInject
@NotNull
HttpServerInventoryViewConfig config = null;
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@Override
public HttpServerInventoryView get()
{
return new HttpServerInventoryView(
jsonMapper,
smileMapper,
httpClient,
new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper),
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue(),
config
);
}
}

View File

@ -19,10 +19,14 @@
package io.druid.client; package io.druid.client;
import io.druid.timeline.DataSegment;
/** /**
*/ */
public interface InventoryView public interface InventoryView
{ {
public DruidServer getInventoryValue(String string); DruidServer getInventoryValue(String string);
public Iterable<DruidServer> getInventory(); Iterable<DruidServer> getInventory();
boolean isStarted();
boolean isSegmentLoadedByServer(String serverKey, DataSegment segment);
} }

View File

@ -19,329 +19,9 @@
package io.druid.client; package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Marker interface for making batch/single/http server inventory view configurable.
*/ */
public abstract class ServerInventoryView<InventoryType> implements ServerView, InventoryView public interface ServerInventoryView extends ServerView, InventoryView
{ {
private final EmittingLogger log;
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
public ServerInventoryView(
final EmittingLogger log,
final String announcementsPath,
final String inventoryPath,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
)
{
this.log = log;
this.inventoryManager = new CuratorInventoryManager<>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return announcementsPath;
}
@Override
public String getInventoryPath()
{
return inventoryPath;
}
},
Execs.singleThreaded("ServerInventoryView-%s"),
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Override
public InventoryType deserializeInventory(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, typeReference);
}
catch (IOException e) {
CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray());
CharBuffer charBuffer = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes));
log.error(e, "Could not parse json: %s", charBuffer.toString());
throw Throwables.propagate(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
}
@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerCallbacks(deadContainer);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
return newContainer.addDataSegments(oldContainer);
}
@Override
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer updateInventory(
DruidServer container, String inventoryKey, InventoryType inventory
)
{
return updateInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
return removeInnerInventory(container, inventoryKey);
}
@Override
public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
);
}
}
);
}
@LifecycleStart
public void start() throws Exception
{
synchronized (started) {
if (!started.get()) {
inventoryManager.start();
started.set(true);
}
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (started) {
if (started.getAndSet(false)) {
inventoryManager.stop();
}
}
}
public boolean isStarted()
{
return started.get();
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
return inventoryManager.getInventoryValue(containerKey);
}
@Override
public Iterable<DruidServer> getInventory()
{
return inventoryManager.getInventory();
}
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverCallbacks.put(callback, exec);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
segmentCallbacks.put(callback, exec);
}
public InventoryManagerConfig getInventoryManagerConfig()
{
return inventoryManager.getConfig();
}
protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
}
);
}
}
protected void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverCallbacks.remove(entry.getKey());
}
}
}
);
}
}
protected void addSingleInventory(
final DruidServer container,
final DataSegment inventory
)
{
log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
if (container.getSegment(inventory.getIdentifier()) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventory.getIdentifier(),
container.getName()
);
return;
}
container.addDataSegment(inventory.getIdentifier(), inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container.getMetadata(), inventory);
}
}
);
}
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
{
log.debug("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return;
}
container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container.getMetadata(), segment);
}
}
);
}
protected abstract DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer updateInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer removeInnerInventory(
final DruidServer container,
String inventoryKey
);
protected abstract void segmentCallbackRemoved(SegmentCallback callback);
} }

View File

@ -28,7 +28,8 @@ import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class), @JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class),
@JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class) @JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "http", value = HttpServerInventoryViewProvider.class),
}) })
public interface ServerInventoryViewProvider extends Provider<ServerInventoryView> public interface ServerInventoryViewProvider extends Provider<ServerInventoryView>
{ {

View File

@ -40,7 +40,7 @@ import java.util.concurrent.Executor;
/** /**
*/ */
@ManageLifecycle @ManageLifecycle
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerInventoryView public class SingleServerInventoryView extends AbstractCuratorServerInventoryView<DataSegment> implements FilteredServerInventoryView
{ {
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);

View File

@ -25,8 +25,10 @@ import com.google.inject.Provides;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.curator.announcement.Announcer; import io.druid.curator.announcement.Announcer;
import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncerProvider; import io.druid.server.coordination.DataSegmentAnnouncerProvider;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -40,7 +42,8 @@ public class AnnouncerModule implements Module
JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class); JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class);
binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class);
} }
@Provides @Provides

View File

@ -23,6 +23,7 @@ import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import io.druid.client.FilteredServerInventoryView; import io.druid.client.FilteredServerInventoryView;
import io.druid.client.FilteredServerInventoryViewProvider; import io.druid.client.FilteredServerInventoryViewProvider;
import io.druid.client.HttpServerInventoryViewConfig;
import io.druid.client.InventoryView; import io.druid.client.InventoryView;
import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryView;
import io.druid.client.ServerInventoryViewProvider; import io.druid.client.ServerInventoryViewProvider;
@ -37,6 +38,7 @@ public class ServerViewModule implements Module
{ {
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerInventoryViewProvider.class); JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerInventoryViewProvider.class);
JsonConfigProvider.bind(binder, "druid.announcer.http", HttpServerInventoryViewConfig.class);
binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(InventoryView.class).to(ServerInventoryView.class);
binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class);
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);

View File

@ -50,6 +50,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Plumbers; import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.Closeable; import java.io.Closeable;
@ -66,6 +67,7 @@ public class RealtimeManager implements QuerySegmentWalker
private final List<FireDepartment> fireDepartments; private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate; private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentServerAnnouncer serverAnnouncer;
/** /**
* key=data source name,value=mappings of partition number to FireChief * key=data source name,value=mappings of partition number to FireChief
@ -75,29 +77,31 @@ public class RealtimeManager implements QuerySegmentWalker
@Inject @Inject
public RealtimeManager( public RealtimeManager(
List<FireDepartment> fireDepartments, List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate QueryRunnerFactoryConglomerate conglomerate,
DataSegmentServerAnnouncer serverAnnouncer
) )
{ {
this.fireDepartments = fireDepartments; this(fireDepartments, conglomerate, serverAnnouncer, Maps.newHashMap());
this.conglomerate = conglomerate;
this.chiefs = Maps.newHashMap();
} }
RealtimeManager( RealtimeManager(
List<FireDepartment> fireDepartments, List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate, QueryRunnerFactoryConglomerate conglomerate,
DataSegmentServerAnnouncer serverAnnouncer,
Map<String, Map<Integer, FireChief>> chiefs Map<String, Map<Integer, FireChief>> chiefs
) )
{ {
this.fireDepartments = fireDepartments; this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate; this.conglomerate = conglomerate;
this.serverAnnouncer = serverAnnouncer;
this.chiefs = chiefs; this.chiefs = chiefs;
} }
@LifecycleStart @LifecycleStart
public void start() throws IOException public void start() throws IOException
{ {
serverAnnouncer.announce();
for (final FireDepartment fireDepartment : fireDepartments) { for (final FireDepartment fireDepartment : fireDepartments) {
final DataSchema schema = fireDepartment.getDataSchema(); final DataSchema schema = fireDepartment.getDataSchema();
@ -129,6 +133,8 @@ public class RealtimeManager implements QuerySegmentWalker
CloseQuietly.close(chief); CloseQuietly.close(chief);
} }
} }
serverAnnouncer.unannounce();
} }
public FireDepartmentMetrics getMetrics(String datasource) public FireDepartmentMetrics getMetrics(String datasource)

View File

@ -112,12 +112,6 @@ public class Appenderators
{ {
// Do nothing // Do nothing
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
}, },
null, null,
null, null,

View File

@ -24,10 +24,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.common.utils.UUIDUtils; import io.druid.common.utils.UUIDUtils;
import io.druid.curator.announcement.Announcer; import io.druid.curator.announcement.Announcer;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
@ -38,8 +40,11 @@ import io.druid.timeline.DataSegment;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
@ -47,7 +52,7 @@ import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
{ {
private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class); private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
@ -64,6 +69,9 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap(); private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
private final Function<DataSegment, DataSegment> segmentTransformer; private final Function<DataSegment, DataSegment> segmentTransformer;
private final SegmentChangeRequestHistory changes = new SegmentChangeRequestHistory();
private final SegmentZNode dummyZnode;
@Inject @Inject
public BatchDataSegmentAnnouncer( public BatchDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
@ -73,7 +81,6 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
ObjectMapper jsonMapper ObjectMapper jsonMapper
) )
{ {
super(server, zkPaths, announcer, jsonMapper);
this.config = config; this.config = config;
this.announcer = announcer; this.announcer = announcer;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -95,18 +102,37 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
return rv; return rv;
} }
}; };
if (this.config.isSkipSegmentAnnouncementOnZk()) {
dummyZnode = new SegmentZNode("PLACE_HOLDER_ONLY");
} else {
dummyZnode = null;
}
} }
@Override @Override
public void announceSegment(DataSegment segment) throws IOException public void announceSegment(DataSegment segment) throws IOException
{ {
DataSegment toAnnounce = segmentTransformer.apply(segment); if (segmentLookup.containsKey(segment)) {
int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length; log.info("Skipping announcement of segment [%s]. Announcement exists already.");
if (newBytesLen > config.getMaxBytesPerNode()) { return;
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
} }
DataSegment toAnnounce = segmentTransformer.apply(segment);
synchronized (lock) { synchronized (lock) {
changes.addSegmentChangeRequest(new SegmentChangeRequestLoad(toAnnounce));
if (config.isSkipSegmentAnnouncementOnZk()) {
segmentLookup.put(segment, dummyZnode);
return;
}
int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length;
if (newBytesLen > config.getMaxBytesPerNode()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
}
boolean done = false; boolean done = false;
if (!availableZNodes.isEmpty()) { if (!availableZNodes.isEmpty()) {
// update existing batch // update existing batch
@ -155,13 +181,20 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
@Override @Override
public void unannounceSegment(DataSegment segment) throws IOException public void unannounceSegment(DataSegment segment) throws IOException
{ {
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
if (segmentZNode == null) {
log.warn("No path to unannounce segment[%s]", segment.getIdentifier());
return;
}
synchronized (lock) { synchronized (lock) {
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
if (segmentZNode == null) {
log.warn("No path to unannounce segment[%s]", segment.getIdentifier());
return;
}
changes.addSegmentChangeRequest(new SegmentChangeRequestDrop(segment));
if (config.isSkipSegmentAnnouncementOnZk()) {
return;
}
segmentZNode.removeSegment(segment); segmentZNode.removeSegment(segment);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
@ -178,38 +211,60 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
@Override @Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException public void announceSegments(Iterable<DataSegment> segments) throws IOException
{ {
Iterable<DataSegment> toAnnounce = Iterables.transform(segments, segmentTransformer);
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath()); SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath());
Set<DataSegment> batch = Sets.newHashSet(); Set<DataSegment> batch = Sets.newHashSet();
List<DataSegmentChangeRequest> changesBatch = new ArrayList<>();
int byteSize = 0; int byteSize = 0;
int count = 0; int count = 0;
for (DataSegment segment : toAnnounce) { synchronized (lock) {
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; for (DataSegment ds : segments) {
if (newBytesLen > config.getMaxBytesPerNode()) { if (segmentLookup.containsKey(ds)) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode()); log.info("Skipping announcement of segment [%s]. Announcement exists already.");
return;
}
DataSegment segment = segmentTransformer.apply(ds);
changesBatch.add(new SegmentChangeRequestLoad(segment));
if (config.isSkipSegmentAnnouncementOnZk()) {
segmentLookup.put(segment, dummyZnode);
continue;
}
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
if (newBytesLen > config.getMaxBytesPerNode()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
}
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxBytesPerNode()) {
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath());
batch = Sets.newHashSet();
count = 0;
byteSize = 0;
}
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
segmentLookup.put(segment, segmentZNode);
batch.add(segment);
count++;
byteSize += newBytesLen;
} }
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxBytesPerNode()) {
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath());
batch = Sets.newHashSet();
count = 0;
byteSize = 0;
}
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
segmentLookup.put(segment, segmentZNode);
batch.add(segment);
count++;
byteSize += newBytesLen;
} }
segmentZNode.addSegments(batch); changes.addSegmentChangeRequests(changesBatch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
if (!config.isSkipSegmentAnnouncementOnZk()) {
segmentZNode.addSegments(batch);
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
}
} }
@Override @Override
@ -220,21 +275,46 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
} }
} }
@Override /**
public boolean isAnnounced(DataSegment segment) * Returns Future that lists the segment load/drop requests since given counter.
*/
public ListenableFuture<SegmentChangeRequestsSnapshot> getSegmentChangesSince(SegmentChangeRequestHistory.Counter counter)
{ {
return segmentLookup.containsKey(segment); if (counter.getCounter() < 0) {
synchronized (lock) {
Iterable<DataSegmentChangeRequest> segments = Iterables.transform(
segmentLookup.keySet(),
new Function<DataSegment, DataSegmentChangeRequest>()
{
@Nullable
@Override
public SegmentChangeRequestLoad apply(DataSegment input)
{
return new SegmentChangeRequestLoad(input);
}
}
);
SettableFuture<SegmentChangeRequestsSnapshot> future = SettableFuture.create();
future.set(SegmentChangeRequestsSnapshot.success(changes.getLastCounter(), Lists.newArrayList(segments)));
return future;
}
} else {
return changes.getRequestsSince(counter);
}
} }
private String makeServedSegmentPath() private String makeServedSegmentPath()
{ {
// server.getName() is already in the zk path // server.getName() is already in the zk path
return makeServedSegmentPath(UUIDUtils.generateUuid( return makeServedSegmentPath(
server.getHost(), UUIDUtils.generateUuid(
server.getType(), server.getHost(),
server.getTier(), server.getType(),
new DateTime().toString() server.getTier(),
)); new DateTime().toString()
)
);
} }
private String makeServedSegmentPath(String zNode) private String makeServedSegmentPath(String zNode)

View File

@ -22,19 +22,17 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.curator.announcement.Announcer; import io.druid.curator.announcement.Announcer;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
/** /**
*/ */
public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnouncer
{ {
private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class); private static final Logger log = new Logger(CuratorDataSegmentServerAnnouncer.class);
private final DruidServerMetadata server; private final DruidServerMetadata server;
private final ZkPathsConfig config; private final ZkPathsConfig config;
@ -43,9 +41,10 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
private final Object lock = new Object(); private final Object lock = new Object();
private volatile boolean started = false; private volatile boolean announced = false;
protected AbstractDataSegmentAnnouncer( @Inject
public CuratorDataSegmentServerAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkPathsConfig config, ZkPathsConfig config,
Announcer announcer, Announcer announcer,
@ -58,11 +57,11 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@LifecycleStart @Override
public void start() public void announce()
{ {
synchronized (lock) { synchronized (lock) {
if (started) { if (announced) {
return; return;
} }
@ -75,22 +74,23 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
started = true; announced = true;
} }
} }
@LifecycleStop @Override
public void stop() public void unannounce()
{ {
synchronized (lock) { synchronized (lock) {
if (!started) { if (!announced) {
return; return;
} }
log.info("Stopping %s with config[%s]", getClass(), config); final String path = makeAnnouncementPath();
announcer.unannounce(makeAnnouncementPath()); log.info("Unannouncing self[%s] at [%s]", server, path);
announcer.unannounce(path);
started = false; announced = false;
} }
} }

View File

@ -32,9 +32,4 @@ public interface DataSegmentAnnouncer
public void announceSegments(Iterable<DataSegment> segments) throws IOException; public void announceSegments(Iterable<DataSegment> segments) throws IOException;
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException; public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
/**
* @return true if the segment was already announced, otherwise false
*/
public boolean isAnnounced(DataSegment segment);
} }

View File

@ -0,0 +1,28 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
/**
*/
public interface DataSegmentServerAnnouncer
{
void announce();
void unannounce();
}

View File

@ -0,0 +1,354 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.druid.common.utils.StringUtils;
import io.druid.java.util.common.IAE;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* This class keeps a bounded list of segment updates made on the server such as adding/dropping segments.
*
* Clients call addSegmentChangeRequest(DataSegmentChangeRequest) or addSegmentChangeRequests(List<DataSegmentChangeRequest>)
* to add segment updates.
*
* Clients call ListenableFuture<SegmentChangeRequestsSnapshot> getRequestsSince(final Counter counter) to get segment
* updates since given counter.
*/
public class SegmentChangeRequestHistory
{
private static int MAX_SIZE = 1000;
private final int maxSize;
private final CircularBuffer<Holder> changes;
@VisibleForTesting
final LinkedHashMap<CustomSettableFuture, Counter> waitingFutures;
private final ExecutorService singleThreadedExecutor;
private final Runnable resolveWaitingFuturesRunnable;
public SegmentChangeRequestHistory()
{
this(MAX_SIZE);
}
public SegmentChangeRequestHistory(int maxSize)
{
this.maxSize = maxSize;
this.changes = new CircularBuffer(maxSize);
this.waitingFutures = new LinkedHashMap<>();
this.resolveWaitingFuturesRunnable = new Runnable()
{
@Override
public void run()
{
resolveWaitingFutures();
}
};
this.singleThreadedExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(
"SegmentChangeRequestHistory"
)
.build()
);
}
/**
* Add batch of segment changes update.
*/
public synchronized void addSegmentChangeRequests(List<DataSegmentChangeRequest> requests)
{
for (DataSegmentChangeRequest request : requests) {
changes.add(new Holder(request, getLastCounter().inc()));
}
singleThreadedExecutor.execute(resolveWaitingFuturesRunnable);
}
/**
* Add single segment change update.
*/
public synchronized void addSegmentChangeRequest(DataSegmentChangeRequest request)
{
addSegmentChangeRequests(ImmutableList.of(request));
}
/**
* Returns a Future that , on completion, returns list of segment updates and associated counter.
* If there are no update since given counter then Future completion waits till an updates is provided.
*
* If counter is older than max number of changes maintained then SegmentChangeRequestsSnapshot is returned
* with resetCounter set to True.
*
* If there were no updates to provide immediately then a future is created and returned to caller. This future
* is added to the "waitingFutures" list and all the futures in the list get resolved as soon as a segment
* update is provided.
*/
public synchronized ListenableFuture<SegmentChangeRequestsSnapshot> getRequestsSince(final Counter counter)
{
final CustomSettableFuture future = new CustomSettableFuture(waitingFutures);
if (counter.counter < 0) {
future.setException(new IAE("counter[%s] must be >= 0", counter));
return future;
}
Counter lastCounter = getLastCounter();
if (counter.counter == lastCounter.counter) {
if (!counter.matches(lastCounter)) {
future.setException(new IAE("counter[%s] failed to match with [%s]", counter, lastCounter));
} else {
synchronized (waitingFutures) {
waitingFutures.put(future, counter);
}
}
} else {
try {
future.set(getRequestsSinceWithoutWait(counter));
} catch (Exception ex) {
future.setException(ex);
}
}
return future;
}
private synchronized SegmentChangeRequestsSnapshot getRequestsSinceWithoutWait(final Counter counter)
{
Counter lastCounter = getLastCounter();
if (counter.counter >= lastCounter.counter) {
throw new IAE("counter[%s] >= last counter[%s]", counter, lastCounter);
} else if (lastCounter.counter - counter.counter >= maxSize) {
// Note: counter reset is requested when client ask for "maxSize" number of changes even if all those changes
// are present in the history because one extra elements is needed to match the counter hash.
return SegmentChangeRequestsSnapshot.fail(
StringUtils.safeFormat(
"can't serve request, not enough history is kept. given counter [%s] and current last counter [%s]",
counter,
lastCounter
)
);
} else {
int changeStartIndex = (int) (counter.counter + changes.size() - lastCounter.counter);
Counter counterToMatch = counter.counter == 0 ? Counter.ZERO : changes.get(changeStartIndex - 1).counter;
if (!counterToMatch.matches(counter)) {
throw new IAE("counter[%s] failed to match with [%s]", counter, counterToMatch);
}
List<DataSegmentChangeRequest> result = new ArrayList<>();
for (int i = changeStartIndex; i < changes.size(); i++) {
result.add(changes.get(i).changeRequest);
}
return SegmentChangeRequestsSnapshot.success(changes.get(changes.size() - 1).counter, result);
}
}
private void resolveWaitingFutures()
{
final LinkedHashMap<CustomSettableFuture, Counter> waitingFuturesCopy = new LinkedHashMap<>();
synchronized (waitingFutures) {
waitingFuturesCopy.putAll(waitingFutures);
waitingFutures.clear();
}
for (Map.Entry<CustomSettableFuture, Counter> e : waitingFuturesCopy.entrySet()) {
try {
e.getKey().set(getRequestsSinceWithoutWait(e.getValue()));
} catch (Exception ex) {
e.getKey().setException(ex);
}
}
}
public synchronized Counter getLastCounter()
{
if (changes.size() > 0) {
return changes.get(changes.size() - 1).counter;
} else {
return Counter.ZERO;
}
}
private static class Holder
{
private final DataSegmentChangeRequest changeRequest;
private final Counter counter;
public Holder(DataSegmentChangeRequest changeRequest, Counter counter)
{
this.changeRequest = changeRequest;
this.counter = counter;
}
}
public static class Counter
{
public static final Counter ZERO = new Counter(0);
private final long counter;
private final long hash;
public Counter(long counter)
{
this(counter, System.currentTimeMillis());
}
@JsonCreator
public Counter(
@JsonProperty("counter") long counter,
@JsonProperty("hash") long hash
)
{
this.counter = counter;
this.hash = hash;
}
@JsonProperty
public long getCounter()
{
return counter;
}
@JsonProperty
public long getHash()
{
return hash;
}
public Counter inc()
{
return new Counter(counter + 1);
}
public boolean matches(Counter other)
{
return this.counter == other.counter && this.hash == other.hash;
}
@Override
public String toString()
{
return "Counter{" +
"counter=" + counter +
", hash=" + hash +
'}';
}
}
// Future with cancel() implementation to remove it from "waitingFutures" list
private static class CustomSettableFuture extends AbstractFuture<SegmentChangeRequestsSnapshot>
{
private final LinkedHashMap<CustomSettableFuture, Counter> waitingFutures;
private CustomSettableFuture(LinkedHashMap<CustomSettableFuture, Counter> waitingFutures)
{
this.waitingFutures = waitingFutures;
}
@Override
public boolean set(SegmentChangeRequestsSnapshot value)
{
return super.set(value);
}
@Override
public boolean setException(Throwable throwable)
{
return super.setException(throwable);
}
@Override
public boolean cancel(boolean interruptIfRunning)
{
synchronized (waitingFutures) {
waitingFutures.remove(this);
}
return true;
}
}
static class CircularBuffer<E>
{
private final E[] buffer;
private int start = 0;
private int size = 0;
CircularBuffer(int capacity)
{
buffer = (E[]) new Object[capacity];
}
void add(E item)
{
buffer[start++] = item;
if (start >= buffer.length) {
start = 0;
}
if (size < buffer.length) {
size++;
}
}
E get(int index)
{
Preconditions.checkArgument(index >= 0 && index < size, "invalid index");
int bufferIndex = (start-size+index) % buffer.length;
if (bufferIndex < 0) {
bufferIndex += buffer.length;
}
return buffer[bufferIndex];
}
int size()
{
return size;
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.List;
/**
* Return type of SegmentChangeRequestHistory.getRequestsSince(counter).
*/
public class SegmentChangeRequestsSnapshot
{
//if true, that means caller should reset the counter and request again.
private final boolean resetCounter;
//cause for reset if resetCounter is true
private final String resetCause;
//segments requests delta since counter, if resetCounter if false
private final SegmentChangeRequestHistory.Counter counter;
private final List<DataSegmentChangeRequest> requests;
@JsonCreator
public SegmentChangeRequestsSnapshot(
@JsonProperty("resetCounter") boolean resetCounter,
@JsonProperty("resetCause") String resetCause,
@JsonProperty("counter") SegmentChangeRequestHistory.Counter counter,
@JsonProperty("requests") List<DataSegmentChangeRequest> requests
)
{
this.resetCounter = resetCounter;
this.resetCause = resetCause;
if (resetCounter) {
Preconditions.checkNotNull(resetCause, "NULL resetCause when resetCounter is true.");
}
this.counter = counter;
this.requests = requests;
}
public static SegmentChangeRequestsSnapshot success(SegmentChangeRequestHistory.Counter counter,
List<DataSegmentChangeRequest> requests)
{
return new SegmentChangeRequestsSnapshot(false, null, counter, requests);
}
public static SegmentChangeRequestsSnapshot fail(String resetCause)
{
return new SegmentChangeRequestsSnapshot(true, resetCause, null, null);
}
@JsonProperty
public boolean isResetCounter()
{
return resetCounter;
}
@JsonProperty
public String getResetCause()
{
return resetCause;
}
@JsonProperty
public SegmentChangeRequestHistory.Counter getCounter()
{
return counter;
}
@JsonProperty
public List<DataSegmentChangeRequest> getRequests()
{
return requests;
}
@Override
public String toString()
{
return "SegmentChangeRequestsSnapshot{" +
"resetCounter=" + resetCounter +
", resetCause='" + resetCause + '\'' +
", counter=" + counter +
", requests=" + requests +
'}';
}
}

View File

@ -72,6 +72,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final DruidServerMetadata me; private final DruidServerMetadata me;
private final CuratorFramework curator; private final CuratorFramework curator;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final ServerManager serverManager; private final ServerManager serverManager;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete; private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
@ -87,6 +88,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
ZkPathsConfig zkPaths, ZkPathsConfig zkPaths,
DruidServerMetadata me, DruidServerMetadata me,
DataSegmentAnnouncer announcer, DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
CuratorFramework curator, CuratorFramework curator,
ServerManager serverManager, ServerManager serverManager,
ScheduledExecutorFactory factory ScheduledExecutorFactory factory
@ -98,6 +100,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
this.me = me; this.me = me;
this.curator = curator; this.curator = curator;
this.announcer = announcer; this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.serverManager = serverManager; this.serverManager = serverManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
@ -132,6 +135,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadLocalCache(); loadLocalCache();
serverAnnouncer.announce();
loadQueueCache.getListenable().addListener( loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
@ -226,6 +230,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try { try {
loadQueueCache.close(); loadQueueCache.close();
serverAnnouncer.unannounce();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -360,13 +365,11 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
} }
loadSegment(segment, callback); loadSegment(segment, callback);
if (!announcer.isAnnounced(segment)) { try {
try { announcer.announceSegment(segment);
announcer.announceSegment(segment); }
} catch (IOException e) {
catch (IOException e) { throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
} }
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
@ -408,14 +411,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
segment.getIdentifier() segment.getIdentifier()
); );
loadSegment(segment, callback); loadSegment(segment, callback);
if (!announcer.isAnnounced(segment)) { try {
try { backgroundSegmentAnnouncer.announceSegment(segment);
backgroundSegmentAnnouncer.announceSegment(segment); }
} catch (InterruptedException e) {
catch (InterruptedException e) { Thread.currentThread().interrupt();
Thread.currentThread().interrupt(); throw new SegmentLoadingException(e, "Loading Interrupted");
throw new SegmentLoadingException(e, "Loading Interrupted");
}
} }
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {

View File

@ -117,7 +117,7 @@ public class DruidCoordinator
private final ZkPathsConfig zkPaths; private final ZkPathsConfig zkPaths;
private final JacksonConfigManager configManager; private final JacksonConfigManager configManager;
private final MetadataSegmentManager metadataSegmentManager; private final MetadataSegmentManager metadataSegmentManager;
private final ServerInventoryView<Object> serverInventoryView; private final ServerInventoryView serverInventoryView;
private final MetadataRuleManager metadataRuleManager; private final MetadataRuleManager metadataRuleManager;
private final CuratorFramework curator; private final CuratorFramework curator;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
@ -362,9 +362,9 @@ public class DruidCoordinator
} }
public void moveSegment( public void moveSegment(
ImmutableDruidServer fromServer, final ImmutableDruidServer fromServer,
ImmutableDruidServer toServer, final ImmutableDruidServer toServer,
String segmentName, final String segmentName,
final LoadPeonCallback callback final LoadPeonCallback callback
) )
{ {
@ -405,10 +405,6 @@ public class DruidCoordinator
toServer.getName() toServer.getName()
), segmentName ), segmentName
); );
final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), toServer.getName()),
segmentName
);
loadPeon.loadSegment( loadPeon.loadSegment(
segment, segment,
@ -418,7 +414,7 @@ public class DruidCoordinator
public void execute() public void execute()
{ {
try { try {
if (curator.checkExists().forPath(toServedSegPath) != null && if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) &&
curator.checkExists().forPath(toLoadQueueSegPath) == null && curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) { !dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback); dropPeon.dropSegment(segment, callback);

View File

@ -0,0 +1,229 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.SegmentChangeRequestHistory;
import io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import io.druid.server.http.security.StateResourceFilter;
import io.druid.server.security.AuthConfig;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
/**
*/
@Path("/druid-internal/v1/segments/")
@ResourceFilters(StateResourceFilter.class)
public class SegmentListerResource
{
protected static final EmittingLogger log = new EmittingLogger(SegmentListerResource.class);
protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
protected final AuthConfig authConfig;
private final BatchDataSegmentAnnouncer announcer;
@Inject
public SegmentListerResource(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
AuthConfig authConfig,
@Nullable BatchDataSegmentAnnouncer announcer
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.authConfig = authConfig;
this.announcer = announcer;
}
/**
* This endpoint is used by HttpServerInventoryView to keep an up-to-date list of segments served by
* historical/realtime nodes.
*
* This endpoint lists segments served by this server and can also incrementally provide the segments added/dropped
* since last response.
*
* Here is how, this is used.
*
* (1) Client sends first request /druid/internal/v1/segments?counter=-1&timeout=<timeout>
* Server responds with list of segments currently served and a <counter,hash> pair.
*
* (2) Client sends subsequent requests /druid/internal/v1/segments?counter=<counter>&hash=<hash>&timeout=<timeout>
* Where <counter,hash> values are used from the last response. Server responds with list of segment updates
* since given counter.
*
* This endpoint makes the client wait till either there is some segment update or given timeout elapses.
*
* So, clients keep on sending next request immediately after receiving the response in order to keep the list
* of segments served by this server up-to-date.
*
* @param counter counter received in last response.
* @param hash hash received in last response.
* @param timeout after which response is sent even if there are no new segment updates.
* @param req
* @throws IOException
*/
@GET
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public void getSegments(
@QueryParam("counter") long counter,
@QueryParam("hash") long hash,
@QueryParam("timeout") long timeout,
@Context final HttpServletRequest req
) throws IOException
{
if (announcer == null) {
sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "announcer is not available.");
return;
}
if (timeout <= 0) {
sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "timeout must be positive.");
return;
}
final ResponseContext context = createContext(req.getHeader("Accept"));
final ListenableFuture<SegmentChangeRequestsSnapshot> future = announcer.getSegmentChangesSince(
new SegmentChangeRequestHistory.Counter(
counter,
hash
)
);
final AsyncContext asyncContext = req.startAsync();
asyncContext.addListener(
new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event) throws IOException
{
}
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
// HTTP 204 NO_CONTENT is sent to the client.
future.cancel(true);
event.getAsyncContext().complete();
}
@Override
public void onError(AsyncEvent event) throws IOException
{
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
}
);
Futures.addCallback(
future,
new FutureCallback<SegmentChangeRequestsSnapshot>()
{
@Override
public void onSuccess(SegmentChangeRequestsSnapshot result)
{
try {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_OK);
context.inputMapper.writeValue(asyncContext.getResponse().getOutputStream(), result);
asyncContext.complete();
}
catch (Exception ex) {
log.debug(ex, "Request timed out or closed already.");
}
}
@Override
public void onFailure(Throwable th)
{
try {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (th instanceof IllegalArgumentException) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, th.getMessage());
} else {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, th.getMessage());
}
asyncContext.complete();
}
catch (Exception ex) {
log.debug(ex, "Request timed out or closed already.");
}
}
}
);
asyncContext.setTimeout(timeout);
}
private void sendErrorResponse(HttpServletRequest req, int code, String error) throws IOException
{
AsyncContext asyncContext = req.startAsync();
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.sendError(code, error);
asyncContext.complete();
}
private ResponseContext createContext(String requestType)
{
boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType);
return new ResponseContext(isSmile ? smileMapper : jsonMapper);
}
private static class ResponseContext
{
private final ObjectMapper inputMapper;
ResponseContext(ObjectMapper inputMapper)
{
this.inputMapper = inputMapper;
}
}
}

View File

@ -45,6 +45,9 @@ public class BatchDataSegmentAnnouncerConfig
@JsonProperty @JsonProperty
private boolean skipDimensionsAndMetrics = false; private boolean skipDimensionsAndMetrics = false;
@JsonProperty
private boolean skipSegmentAnnouncementOnZk = false;
public int getSegmentsPerNode() public int getSegmentsPerNode()
{ {
return segmentsPerNode; return segmentsPerNode;
@ -65,4 +68,8 @@ public class BatchDataSegmentAnnouncerConfig
return skipDimensionsAndMetrics; return skipDimensionsAndMetrics;
} }
public boolean isSkipSegmentAnnouncementOnZk()
{
return skipSegmentAnnouncementOnZk;
}
} }

View File

@ -60,7 +60,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
private CountDownLatch segmentAddedLatch; private CountDownLatch segmentAddedLatch;
private CountDownLatch segmentRemovedLatch; private CountDownLatch segmentRemovedLatch;
private ServerInventoryView baseView; private BatchServerInventoryView baseView;
private CoordinatorServerView overlordServerView; private CoordinatorServerView overlordServerView;
public CoordinatorServerViewTest() public CoordinatorServerViewTest()

View File

@ -30,7 +30,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.BatchServerInventoryView; import io.druid.client.BatchServerInventoryView;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.client.ServerView; import io.druid.client.ServerView;
@ -41,6 +40,8 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.Comparators;
import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
@ -87,6 +88,7 @@ public class BatchServerInventoryViewTest
private ObjectMapper jsonMapper; private ObjectMapper jsonMapper;
private Announcer announcer; private Announcer announcer;
private BatchDataSegmentAnnouncer segmentAnnouncer; private BatchDataSegmentAnnouncer segmentAnnouncer;
private DataSegmentServerAnnouncer serverAnnouncer;
private Set<DataSegment> testSegments; private Set<DataSegment> testSegments;
private BatchServerInventoryView batchServerInventoryView; private BatchServerInventoryView batchServerInventoryView;
private BatchServerInventoryView filteredBatchServerInventoryView; private BatchServerInventoryView filteredBatchServerInventoryView;
@ -118,15 +120,34 @@ public class BatchServerInventoryViewTest
); );
announcer.start(); announcer.start();
DruidServerMetadata serverMetadata = new DruidServerMetadata(
"id",
"host",
Long.MAX_VALUE,
"type",
"tier",
0
);
ZkPathsConfig zkPathsConfig = new ZkPathsConfig()
{
@Override
public String getBase()
{
return testBasePath;
}
};
serverAnnouncer = new CuratorDataSegmentServerAnnouncer(
serverMetadata,
zkPathsConfig,
announcer,
jsonMapper
);
serverAnnouncer.announce();
segmentAnnouncer = new BatchDataSegmentAnnouncer( segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata( serverMetadata,
"id",
"host",
Long.MAX_VALUE,
"type",
"tier",
0
),
new BatchDataSegmentAnnouncerConfig() new BatchDataSegmentAnnouncerConfig()
{ {
@Override @Override
@ -135,18 +156,10 @@ public class BatchServerInventoryViewTest
return 50; return 50;
} }
}, },
new ZkPathsConfig() zkPathsConfig,
{
@Override
public String getBase()
{
return testBasePath;
}
},
announcer, announcer,
jsonMapper jsonMapper
); );
segmentAnnouncer.start();
testSegments = Sets.newConcurrentHashSet(); testSegments = Sets.newConcurrentHashSet();
for (int i = 0; i < INITIAL_SEGMENTS; i++) { for (int i = 0; i < INITIAL_SEGMENTS; i++) {
@ -207,7 +220,7 @@ public class BatchServerInventoryViewTest
{ {
batchServerInventoryView.stop(); batchServerInventoryView.stop();
filteredBatchServerInventoryView.stop(); filteredBatchServerInventoryView.stop();
segmentAnnouncer.stop(); serverAnnouncer.unannounce();
announcer.stop(); announcer.stop();
cf.close(); cf.close();
testingCluster.stop(); testingCluster.stop();
@ -453,7 +466,6 @@ public class BatchServerInventoryViewTest
announcer, announcer,
jsonMapper jsonMapper
); );
segmentAnnouncer.start();
List<DataSegment> segments = new ArrayList<DataSegment>(); List<DataSegment> segments = new ArrayList<DataSegment>();
try { try {
for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) {

View File

@ -70,8 +70,10 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink; import io.druid.segment.realtime.plumber.Sink;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.LinearShardSpec;
import io.druid.utils.Runnables; import io.druid.utils.Runnables;
import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
@ -215,7 +217,8 @@ public class RealtimeManagerTest
tuningConfig tuningConfig
) )
), ),
null null,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)
); );
plumber2 = new TestPlumber(new Sink( plumber2 = new TestPlumber(new Sink(
new Interval("0/P5000Y"), new Interval("0/P5000Y"),
@ -234,7 +237,8 @@ public class RealtimeManagerTest
tuningConfig tuningConfig
) )
), ),
null null,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)
); );
tuningConfig_0 = new RealtimeTuningConfig( tuningConfig_0 = new RealtimeTuningConfig(
@ -319,6 +323,7 @@ public class RealtimeManagerTest
realtimeManager3 = new RealtimeManager( realtimeManager3 = new RealtimeManager(
Arrays.asList(department_0, department_1), Arrays.asList(department_0, department_1),
conglomerate, conglomerate,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
ImmutableMap.<String, Map<Integer, RealtimeManager.FireChief>>of( ImmutableMap.<String, Map<Integer, RealtimeManager.FireChief>>of(
"testing", "testing",
ImmutableMap.of( ImmutableMap.of(

View File

@ -235,12 +235,6 @@ public class AppenderatorTester implements AutoCloseable
{ {
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
}, },
emitter, emitter,
queryExecutor, queryExecutor,

View File

@ -0,0 +1,246 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class SegmentChangeRequestHistoryTest
{
@Test
public void testSimple() throws Exception
{
SegmentChangeRequestHistory history = new SegmentChangeRequestHistory();
Assert.assertEquals(0, history.getLastCounter().getCounter());
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
Assert.assertEquals(1, history.getLastCounter().getCounter());
SegmentChangeRequestsSnapshot snapshot = history.getRequestsSince(SegmentChangeRequestHistory.Counter.ZERO).get();
Assert.assertEquals(1, snapshot.getRequests().size());
Assert.assertEquals(1, snapshot.getCounter().getCounter());
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
Assert.assertEquals(2, history.getLastCounter().getCounter());
snapshot = history.getRequestsSince(snapshot.getCounter()).get();
Assert.assertEquals(1, snapshot.getRequests().size());
Assert.assertEquals(2, snapshot.getCounter().getCounter());
snapshot = history.getRequestsSince(SegmentChangeRequestHistory.Counter.ZERO).get();
Assert.assertEquals(2, snapshot.getRequests().size());
Assert.assertEquals(2, snapshot.getCounter().getCounter());
}
@Test
public void testTruncatedHistory() throws Exception
{
SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(2);
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter one = history.getLastCounter();
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter two = history.getLastCounter();
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter three = history.getLastCounter();
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter four = history.getLastCounter();
Assert.assertTrue(history.getRequestsSince(SegmentChangeRequestHistory.Counter.ZERO).get().isResetCounter());
Assert.assertTrue(history.getRequestsSince(one).get().isResetCounter());
Assert.assertTrue(history.getRequestsSince(two).get().isResetCounter());
SegmentChangeRequestsSnapshot snapshot = history.getRequestsSince(three).get();
Assert.assertEquals(1, snapshot.getRequests().size());
Assert.assertEquals(4, snapshot.getCounter().getCounter());
}
@Test
public void testCounterHashMismatch() throws Exception
{
SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(3);
try {
history.getRequestsSince(new SegmentChangeRequestHistory.Counter(0, 1234)).get();
Assert.fail();
} catch (ExecutionException ex) {
Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException);
}
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter one = history.getLastCounter();
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter two = history.getLastCounter();
try {
history.getRequestsSince(new SegmentChangeRequestHistory.Counter(0, 1234)).get();
Assert.fail();
} catch (ExecutionException ex) {
Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException);
}
SegmentChangeRequestsSnapshot snapshot = history.getRequestsSince(one).get();
Assert.assertEquals(1, snapshot.getRequests().size());
Assert.assertEquals(2, snapshot.getCounter().getCounter());
try {
history.getRequestsSince(new SegmentChangeRequestHistory.Counter(1, 1234)).get();
Assert.fail();
} catch (ExecutionException ex) {
Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException);
}
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter three = history.getLastCounter();
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestHistory.Counter four = history.getLastCounter();
snapshot = history.getRequestsSince(two).get();
Assert.assertEquals(2, snapshot.getRequests().size());
Assert.assertEquals(4, snapshot.getCounter().getCounter());
try {
history.getRequestsSince(new SegmentChangeRequestHistory.Counter(2, 1234)).get();
Assert.fail();
} catch (ExecutionException ex) {
Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException);
}
}
@Test
public void testCancel() throws Exception
{
final SegmentChangeRequestHistory history = new SegmentChangeRequestHistory();
ListenableFuture<SegmentChangeRequestsSnapshot> future = history.getRequestsSince(
SegmentChangeRequestHistory.Counter.ZERO
);
Assert.assertEquals(1, history.waitingFutures.size());
final AtomicBoolean callbackExcecuted = new AtomicBoolean(false);
Futures.addCallback(
future,
new FutureCallback<SegmentChangeRequestsSnapshot>()
{
@Override
public void onSuccess(SegmentChangeRequestsSnapshot result)
{
callbackExcecuted.set(true);
}
@Override
public void onFailure(Throwable t)
{
callbackExcecuted.set(true);
}
}
);
future.cancel(true);
Assert.assertEquals(0, history.waitingFutures.size());
Assert.assertFalse(callbackExcecuted.get());
}
@Test
public void testNonImmediateFuture() throws Exception
{
final SegmentChangeRequestHistory history = new SegmentChangeRequestHistory();
Future<SegmentChangeRequestsSnapshot> future = history.getRequestsSince(
SegmentChangeRequestHistory.Counter.ZERO
);
Assert.assertFalse(future.isDone());
history.addSegmentChangeRequest(new SegmentChangeRequestNoop());
SegmentChangeRequestsSnapshot snapshot = future.get(1, TimeUnit.MINUTES);
Assert.assertEquals(1, snapshot.getCounter().getCounter());
Assert.assertEquals(1, snapshot.getRequests().size());
}
@Test
public void testCircularBuffer() throws Exception
{
SegmentChangeRequestHistory.CircularBuffer<Integer> circularBuffer = new SegmentChangeRequestHistory.CircularBuffer<>(
3);
circularBuffer.add(1);
Assert.assertEquals(1, circularBuffer.size());
Assert.assertEquals(1, (int) circularBuffer.get(0));
circularBuffer.add(2);
Assert.assertEquals(2, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+1, (int) circularBuffer.get(i));
}
circularBuffer.add(3);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+1, (int) circularBuffer.get(i));
}
circularBuffer.add(4);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+2, (int) circularBuffer.get(i));
}
circularBuffer.add(5);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+3, (int) circularBuffer.get(i));
}
circularBuffer.add(6);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+4, (int) circularBuffer.get(i));
}
circularBuffer.add(7);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+5, (int) circularBuffer.get(i));
}
circularBuffer.add(8);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i+6, (int) circularBuffer.get(i));
}
}
}

View File

@ -29,7 +29,6 @@ import com.google.inject.Binder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Module; import com.google.inject.Module;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider; import io.druid.client.cache.LocalCacheProvider;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
@ -50,6 +49,7 @@ import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -139,6 +139,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
announceCount = new AtomicInteger(0); announceCount = new AtomicInteger(0);
announcer = new DataSegmentAnnouncer() announcer = new DataSegmentAnnouncer()
{ {
private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer( private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer(
@ -184,12 +185,6 @@ public class ZkCoordinatorTest extends CuratorTestBase
announceCount.addAndGet(-Iterables.size(segments)); announceCount.addAndGet(-Iterables.size(segments));
delegate.unannounceSegments(segments); delegate.unannounceSegments(segments);
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return segmentsAnnouncedByMe.contains(segment);
}
}; };
zkCoordinator = new ZkCoordinator( zkCoordinator = new ZkCoordinator(
@ -223,6 +218,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
zkPaths, zkPaths,
me, me,
announcer, announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
curator, curator,
serverManager, serverManager,
new ScheduledExecutorFactory() new ScheduledExecutorFactory()
@ -518,6 +514,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
binder.bind(DruidServerMetadata.class) binder.bind(DruidServerMetadata.class)
.toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0)); .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0));
binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); binder.bind(DataSegmentAnnouncer.class).toInstance(announcer);
binder.bind(DataSegmentServerAnnouncer.class).toInstance(EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
binder.bind(CuratorFramework.class).toInstance(curator); binder.bind(CuratorFramework.class).toInstance(curator);
binder.bind(ServerManager.class).toInstance(serverManager); binder.bind(ServerManager.class).toInstance(serverManager);
binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle())); binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle()));

View File

@ -33,6 +33,8 @@ import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.SegmentChangeRequestHistory;
import io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -145,7 +147,6 @@ public class BatchDataSegmentAnnouncerTest
announcer, announcer,
jsonMapper jsonMapper
); );
segmentAnnouncer.start();
testSegments = Sets.newHashSet(); testSegments = Sets.newHashSet();
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
@ -156,7 +157,6 @@ public class BatchDataSegmentAnnouncerTest
@After @After
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
segmentAnnouncer.stop();
announcer.stop(); announcer.stop();
cf.close(); cf.close();
testingCluster.stop(); testingCluster.stop();
@ -185,6 +185,12 @@ public class BatchDataSegmentAnnouncerTest
Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments); Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments);
} }
SegmentChangeRequestsSnapshot snapshot = segmentAnnouncer.getSegmentChangesSince(
new SegmentChangeRequestHistory.Counter(-1, -1)
).get();
Assert.assertEquals(2, snapshot.getRequests().size());
Assert.assertEquals(2, snapshot.getCounter().getCounter());
segmentAnnouncer.unannounceSegment(firstSegment); segmentAnnouncer.unannounceSegment(firstSegment);
for (String zNode : zNodes) { for (String zNode : zNodes) {
@ -195,6 +201,18 @@ public class BatchDataSegmentAnnouncerTest
segmentAnnouncer.unannounceSegment(secondSegment); segmentAnnouncer.unannounceSegment(secondSegment);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
snapshot = segmentAnnouncer.getSegmentChangesSince(
snapshot.getCounter()
).get();
Assert.assertEquals(2, snapshot.getRequests().size());
Assert.assertEquals(4, snapshot.getCounter().getCounter());
snapshot = segmentAnnouncer.getSegmentChangesSince(
new SegmentChangeRequestHistory.Counter(-1, -1)
).get();
Assert.assertEquals(0, snapshot.getRequests().size());
Assert.assertEquals(4, snapshot.getCounter().getCounter());
} }
@Test @Test
@ -272,6 +290,11 @@ public class BatchDataSegmentAnnouncerTest
@Test @Test
public void testBatchAnnounce() throws Exception public void testBatchAnnounce() throws Exception
{
testBatchAnnounce(true);
}
private void testBatchAnnounce(boolean testHistory) throws Exception
{ {
segmentAnnouncer.announceSegments(testSegments); segmentAnnouncer.announceSegments(testSegments);
@ -285,16 +308,40 @@ public class BatchDataSegmentAnnouncerTest
} }
Assert.assertEquals(allSegments, testSegments); Assert.assertEquals(allSegments, testSegments);
SegmentChangeRequestsSnapshot snapshot = null;
if (testHistory) {
snapshot = segmentAnnouncer.getSegmentChangesSince(
new SegmentChangeRequestHistory.Counter(-1, -1)
).get();
Assert.assertEquals(testSegments.size(), snapshot.getRequests().size());
Assert.assertEquals(testSegments.size(), snapshot.getCounter().getCounter());
}
segmentAnnouncer.unannounceSegments(testSegments); segmentAnnouncer.unannounceSegments(testSegments);
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
if (testHistory) {
snapshot = segmentAnnouncer.getSegmentChangesSince(
snapshot.getCounter()
).get();
Assert.assertEquals(testSegments.size(), snapshot.getRequests().size());
Assert.assertEquals(2 * testSegments.size(), snapshot.getCounter().getCounter());
snapshot = segmentAnnouncer.getSegmentChangesSince(
new SegmentChangeRequestHistory.Counter(-1, -1)
).get();
Assert.assertEquals(0, snapshot.getRequests().size());
Assert.assertEquals(2 * testSegments.size(), snapshot.getCounter().getCounter());
}
} }
@Test @Test
public void testMultipleBatchAnnounce() throws Exception public void testMultipleBatchAnnounce() throws Exception
{ {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
testBatchAnnounce(); testBatchAnnounce(false);
} }
} }

View File

@ -36,7 +36,6 @@ import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase; import io.druid.curator.CuratorTestBase;
import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManager;
@ -238,22 +237,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
loadManagementPeons.put("from", loadQueuePeon); loadManagementPeons.put("from", loadQueuePeon);
loadManagementPeons.put("to", loadQueuePeon); loadManagementPeons.put("to", loadQueuePeon);
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
@Override
public String getInventoryPath()
{
return "";
}
}
);
EasyMock.replay(serverInventoryView); EasyMock.replay(serverInventoryView);
coordinator.moveSegment( coordinator.moveSegment(

View File

@ -37,6 +37,7 @@ import io.druid.guice.NodeTypeConfig;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.lookup.LookupModule; import io.druid.query.lookup.LookupModule;
import io.druid.server.http.SegmentListerResource;
import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ServerManager;
@ -86,6 +87,7 @@ public class CliHistorical extends ServerRunnable
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, HistoricalResource.class); Jerseys.addResource(binder, HistoricalResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, ZkCoordinator.class); LifecycleModule.register(binder, ZkCoordinator.class);

View File

@ -84,6 +84,7 @@ import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.http.SegmentListerResource;
import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule; import io.druid.server.initialization.jetty.ChatHandlerServerModule;
@ -204,6 +205,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);

View File

@ -24,7 +24,6 @@ import com.google.inject.Binder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.client.InventoryView; import io.druid.client.InventoryView;
@ -118,6 +117,18 @@ public class CliRealtimeExample extends ServerRunnable
{ {
return ImmutableList.of(); return ImmutableList.of();
} }
@Override
public boolean isStarted()
{
return true;
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
return false;
}
} }
private static class NoopDataSegmentPusher implements DataSegmentPusher private static class NoopDataSegmentPusher implements DataSegmentPusher
@ -168,11 +179,5 @@ public class CliRealtimeExample extends ServerRunnable
{ {
// do nothing // do nothing
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
} }
} }

View File

@ -39,9 +39,10 @@ import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.http.SegmentListerResource;
import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.QueryCountStatsProvider;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.util.List; import java.util.List;
@ -105,6 +106,7 @@ public class RealtimeModule implements Module
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);
} }