mirror of https://github.com/apache/druid.git
add read to batch zk
This commit is contained in:
parent
b6aeb5b376
commit
3f97ac9253
|
@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
|
@ -33,11 +34,13 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.BatchingServerInventoryView;
|
||||
import com.metamx.druid.client.DruidServerConfig;
|
||||
import com.metamx.druid.client.InventoryView;
|
||||
import com.metamx.druid.client.ServerInventoryView;
|
||||
import com.metamx.druid.client.ServerInventoryViewConfig;
|
||||
import com.metamx.druid.client.ServerView;
|
||||
import com.metamx.druid.client.SingleServerInventoryView;
|
||||
import com.metamx.druid.concurrent.Execs;
|
||||
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer;
|
||||
|
@ -343,7 +346,8 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
|
|||
}
|
||||
}
|
||||
|
||||
private void initializeInventoryView()
|
||||
private void
|
||||
initializeInventoryView()
|
||||
{
|
||||
if (inventoryView == null) {
|
||||
initializeServerInventoryView();
|
||||
|
@ -357,13 +361,29 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
|
|||
final ExecutorService exec = Executors.newFixedThreadPool(
|
||||
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
|
||||
);
|
||||
serverInventoryView = new ServerInventoryView(
|
||||
getConfigFactory().build(ServerInventoryViewConfig.class),
|
||||
|
||||
final ServerInventoryViewConfig serverInventoryViewConfig = getConfigFactory().build(ServerInventoryViewConfig.class);
|
||||
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
|
||||
|
||||
if ("legacy".equalsIgnoreCase(announcerType)) {
|
||||
serverInventoryView = new SingleServerInventoryView(
|
||||
serverInventoryViewConfig,
|
||||
getZkPaths(),
|
||||
getCuratorFramework(),
|
||||
exec,
|
||||
getJsonMapper()
|
||||
);
|
||||
} else if ("batch".equalsIgnoreCase(announcerType)) {
|
||||
serverInventoryView = new BatchingServerInventoryView(
|
||||
serverInventoryViewConfig,
|
||||
getZkPaths(),
|
||||
getCuratorFramework(),
|
||||
exec,
|
||||
getJsonMapper()
|
||||
);
|
||||
} else {
|
||||
throw new IAE("Unknown type %s", announcerType);
|
||||
}
|
||||
lifecycle.addManagedInstance(serverInventoryView);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BatchingServerInventoryView extends ServerInventoryView<Set<DataSegment>>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(BatchingServerInventoryView.class);
|
||||
|
||||
final ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
|
||||
|
||||
public BatchingServerInventoryView(
|
||||
final ServerInventoryViewConfig config,
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ExecutorService exec,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(
|
||||
config, zkPaths, curator, exec, jsonMapper, new TypeReference<Set<DataSegment>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DruidServer addInnerInventory(
|
||||
final DruidServer container,
|
||||
String inventoryKey,
|
||||
final Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
zNodes.put(inventoryKey, inventory);
|
||||
for (DataSegment segment : inventory) {
|
||||
addSingleInventory(container, segment);
|
||||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DruidServer updateInnerInventory(
|
||||
DruidServer container, String inventoryKey, Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
Set<DataSegment> existing = zNodes.get(inventoryKey);
|
||||
if (existing == null) {
|
||||
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
|
||||
}
|
||||
|
||||
for (DataSegment segment : Sets.difference(inventory, existing)) {
|
||||
addSingleInventory(container, segment);
|
||||
}
|
||||
for (DataSegment segment : Sets.difference(existing, inventory)) {
|
||||
removeSingleInventory(container, segment.getIdentifier());
|
||||
}
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
|
||||
{
|
||||
log.info("Server[%s] removed container[%s]", container.getName(), inventoryKey);
|
||||
Set<DataSegment> segments = zNodes.remove(inventoryKey);
|
||||
|
||||
if (segments == null) {
|
||||
log.warn("Told to remove container[%s], which didn't exist", inventoryKey);
|
||||
return container;
|
||||
}
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
removeSingleInventory(container, segment.getIdentifier());
|
||||
}
|
||||
return container;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
package com.metamx.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -43,27 +44,30 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class ServerInventoryView implements ServerView, InventoryView
|
||||
public abstract class ServerInventoryView<InventoryType> implements ServerView, InventoryView
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
|
||||
protected static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
|
||||
|
||||
private final CuratorInventoryManager<DruidServer, DataSegment> inventoryManager;
|
||||
private ServerInventoryViewConfig config;
|
||||
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
|
||||
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
|
||||
protected final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
|
||||
protected final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
|
||||
|
||||
private static final Map<String, Integer> removedSegments = new MapMaker().makeMap();
|
||||
protected static final Map<String, Integer> removedSegments = new MapMaker().makeMap();
|
||||
|
||||
public ServerInventoryView(
|
||||
final ServerInventoryViewConfig config,
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ExecutorService exec,
|
||||
final ObjectMapper jsonMapper
|
||||
final ObjectMapper jsonMapper,
|
||||
final TypeReference<InventoryType> typeReference
|
||||
)
|
||||
{
|
||||
inventoryManager = new CuratorInventoryManager<DruidServer, DataSegment>(
|
||||
this.config = config;
|
||||
this.inventoryManager = new CuratorInventoryManager<DruidServer, InventoryType>(
|
||||
curator,
|
||||
new InventoryManagerConfig()
|
||||
{
|
||||
|
@ -80,7 +84,7 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
}
|
||||
},
|
||||
exec,
|
||||
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>()
|
||||
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
|
||||
{
|
||||
@Override
|
||||
public DruidServer deserializeContainer(byte[] bytes)
|
||||
|
@ -105,10 +109,10 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataSegment deserializeInventory(byte[] bytes)
|
||||
public InventoryType deserializeInventory(byte[] bytes)
|
||||
{
|
||||
try {
|
||||
return jsonMapper.readValue(bytes, DataSegment.class);
|
||||
return jsonMapper.readValue(bytes, typeReference);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -116,7 +120,7 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] serializeInventory(DataSegment inventory)
|
||||
public byte[] serializeInventory(InventoryType inventory)
|
||||
{
|
||||
try {
|
||||
return jsonMapper.writeValueAsBytes(inventory);
|
||||
|
@ -146,67 +150,27 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
}
|
||||
|
||||
@Override
|
||||
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
|
||||
public DruidServer addInventory(
|
||||
final DruidServer container,
|
||||
String inventoryKey,
|
||||
final InventoryType inventory
|
||||
)
|
||||
{
|
||||
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
|
||||
|
||||
if (container.getSegment(inventoryKey) != null) {
|
||||
log.warn(
|
||||
"Not adding or running callbacks for existing segment[%s] on server[%s]",
|
||||
inventoryKey,
|
||||
container.getName()
|
||||
);
|
||||
|
||||
return container;
|
||||
return addInnerInventory(container, inventoryKey, inventory);
|
||||
}
|
||||
|
||||
final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
|
||||
|
||||
runSegmentCallbacks(
|
||||
new Function<SegmentCallback, CallbackAction>()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
public DruidServer updateInventory(
|
||||
DruidServer container, String inventoryKey, InventoryType inventory
|
||||
)
|
||||
{
|
||||
return input.segmentAdded(container, inventory);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return retVal;
|
||||
return updateInnerInventory(container, inventoryKey, inventory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
|
||||
{
|
||||
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
|
||||
final DataSegment segment = container.getSegment(inventoryKey);
|
||||
|
||||
if (segment == null) {
|
||||
log.warn(
|
||||
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
|
||||
inventoryKey,
|
||||
container.getName()
|
||||
);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
final DruidServer retVal = container.removeDataSegment(inventoryKey);
|
||||
|
||||
runSegmentCallbacks(
|
||||
new Function<SegmentCallback, CallbackAction>()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
{
|
||||
return input.segmentRemoved(container, segment);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
|
||||
return retVal;
|
||||
return removeInnerInventory(container, inventoryKey);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -282,7 +246,7 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
segmentCallbacks.put(callback, exec);
|
||||
}
|
||||
|
||||
private void runSegmentCallbacks(
|
||||
protected void runSegmentCallbacks(
|
||||
final Function<SegmentCallback, CallbackAction> fn
|
||||
)
|
||||
{
|
||||
|
@ -302,7 +266,7 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
}
|
||||
}
|
||||
|
||||
private void runServerCallbacks(final DruidServer server)
|
||||
protected void runServerCallbacks(final DruidServer server)
|
||||
{
|
||||
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
|
||||
entry.getValue().execute(
|
||||
|
@ -319,4 +283,83 @@ public class ServerInventoryView implements ServerView, InventoryView
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
protected void addSingleInventory(
|
||||
final DruidServer container,
|
||||
final DataSegment inventory
|
||||
)
|
||||
{
|
||||
log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
|
||||
|
||||
if (container.getSegment(inventory.getIdentifier()) != null) {
|
||||
log.warn(
|
||||
"Not adding or running callbacks for existing segment[%s] on server[%s]",
|
||||
inventory.getIdentifier(),
|
||||
container.getName()
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
container.addDataSegment(inventory.getIdentifier(), inventory);
|
||||
|
||||
runSegmentCallbacks(
|
||||
new Function<SegmentCallback, CallbackAction>()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
{
|
||||
return input.segmentAdded(container, inventory);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
|
||||
{
|
||||
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
|
||||
final DataSegment segment = container.getSegment(inventoryKey);
|
||||
|
||||
if (segment == null) {
|
||||
log.warn(
|
||||
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
|
||||
inventoryKey,
|
||||
container.getName()
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
container.removeDataSegment(inventoryKey);
|
||||
|
||||
runSegmentCallbacks(
|
||||
new Function<SegmentCallback, CallbackAction>()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
{
|
||||
return input.segmentRemoved(container, segment);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
|
||||
}
|
||||
|
||||
protected abstract DruidServer addInnerInventory(
|
||||
final DruidServer container,
|
||||
String inventoryKey,
|
||||
final InventoryType inventory
|
||||
);
|
||||
|
||||
protected abstract DruidServer updateInnerInventory(
|
||||
final DruidServer container,
|
||||
String inventoryKey,
|
||||
final InventoryType inventory
|
||||
);
|
||||
|
||||
protected abstract DruidServer removeInnerInventory(
|
||||
final DruidServer container,
|
||||
String inventoryKey
|
||||
);
|
||||
}
|
||||
|
|
|
@ -29,4 +29,8 @@ public abstract class ServerInventoryViewConfig
|
|||
@Config("druid.master.removedSegmentLifetime")
|
||||
@Default("1")
|
||||
public abstract int getRemovedSegmentLifetime();
|
||||
|
||||
@Config("druid.announcer.type")
|
||||
@Default("legacy")
|
||||
public abstract String getAnnouncerType();
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
||||
{
|
||||
public SingleServerInventoryView(
|
||||
final ServerInventoryViewConfig config,
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ExecutorService exec,
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(
|
||||
config, zkPaths, curator, exec, jsonMapper, new TypeReference<DataSegment>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DruidServer addInnerInventory(
|
||||
DruidServer container, String inventoryKey, DataSegment inventory
|
||||
)
|
||||
{
|
||||
addSingleInventory(container, inventory);
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DruidServer updateInnerInventory(
|
||||
DruidServer container, String inventoryKey, DataSegment inventory
|
||||
)
|
||||
{
|
||||
return addInnerInventory(container, inventoryKey, inventory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey)
|
||||
{
|
||||
removeSingleInventory(container, inventoryKey);
|
||||
return container;
|
||||
}
|
||||
}
|
|
@ -290,11 +290,18 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
|
||||
switch (event.getType()) {
|
||||
case CHILD_ADDED:
|
||||
case CHILD_UPDATED:
|
||||
final InventoryClass inventory = strategy.deserializeInventory(child.getData());
|
||||
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
|
||||
|
||||
synchronized (holder) {
|
||||
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory));
|
||||
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
|
||||
}
|
||||
|
||||
break;
|
||||
case CHILD_UPDATED:
|
||||
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
|
||||
|
||||
synchronized (holder) {
|
||||
holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
|
@ -33,5 +33,6 @@ public interface CuratorInventoryManagerStrategy<ContainerClass, InventoryClass>
|
|||
public void deadContainer(ContainerClass deadContainer);
|
||||
public ContainerClass updateContainer(ContainerClass oldContainer, ContainerClass newContainer);
|
||||
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
|
||||
public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
|
||||
public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
|
||||
}
|
||||
|
|
|
@ -111,6 +111,12 @@ public class BatchingCuratorDataSegmentAnnouncerTest
|
|||
{
|
||||
return 100000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAnnouncerType()
|
||||
{
|
||||
return "batch";
|
||||
}
|
||||
},
|
||||
announcer,
|
||||
jsonMapper
|
||||
|
|
|
@ -209,6 +209,14 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
|
|||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> updateInventory(
|
||||
Map<String, Integer> container, String inventoryKey, Integer inventory
|
||||
)
|
||||
{
|
||||
return addInventory(container, inventoryKey, inventory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> removeInventory(Map<String, Integer> container, String inventoryKey)
|
||||
{
|
||||
|
|
|
@ -27,13 +27,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.BatchingServerInventoryView;
|
||||
import com.metamx.druid.client.ServerInventoryView;
|
||||
import com.metamx.druid.client.ServerInventoryViewConfig;
|
||||
import com.metamx.druid.client.SingleServerInventoryView;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.concurrent.Execs;
|
||||
import com.metamx.druid.config.ConfigManager;
|
||||
|
@ -131,9 +134,31 @@ public class MasterMain
|
|||
final ExecutorService exec = Executors.newFixedThreadPool(
|
||||
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
|
||||
);
|
||||
ServerInventoryView serverInventoryView = new ServerInventoryView(
|
||||
configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper
|
||||
|
||||
final ServerInventoryViewConfig serverInventoryViewConfig = configFactory.build(ServerInventoryViewConfig.class);
|
||||
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
|
||||
|
||||
final ServerInventoryView serverInventoryView;
|
||||
if ("legacy".equalsIgnoreCase(announcerType)) {
|
||||
serverInventoryView = new SingleServerInventoryView(
|
||||
serverInventoryViewConfig,
|
||||
zkPaths,
|
||||
curatorFramework,
|
||||
exec,
|
||||
jsonMapper
|
||||
);
|
||||
} else if ("batch".equalsIgnoreCase(announcerType)) {
|
||||
serverInventoryView = new BatchingServerInventoryView(
|
||||
serverInventoryViewConfig,
|
||||
zkPaths,
|
||||
curatorFramework,
|
||||
exec,
|
||||
jsonMapper
|
||||
);
|
||||
} else {
|
||||
throw new IAE("Unknown type %s", announcerType);
|
||||
}
|
||||
|
||||
lifecycle.addManagedInstance(serverInventoryView);
|
||||
|
||||
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
|
||||
|
|
|
@ -85,7 +85,7 @@ public class DruidMaster
|
|||
private final ZkPathsConfig zkPaths;
|
||||
private final JacksonConfigManager configManager;
|
||||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final ServerInventoryView serverInventoryView;
|
||||
private final ServerInventoryView<Object> serverInventoryView;
|
||||
private final DatabaseRuleManager databaseRuleManager;
|
||||
private final CuratorFramework curator;
|
||||
private final ServiceEmitter emitter;
|
||||
|
|
|
@ -23,7 +23,7 @@ import com.google.common.collect.MapMaker;
|
|||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerInventoryView;
|
||||
import com.metamx.druid.client.SingleServerInventoryView;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.metrics.NoopServiceEmitter;
|
||||
|
@ -44,7 +44,7 @@ public class DruidMasterTest
|
|||
private CuratorFramework curator;
|
||||
private LoadQueueTaskMaster taskMaster;
|
||||
private DatabaseSegmentManager databaseSegmentManager;
|
||||
private ServerInventoryView serverInventoryView;
|
||||
private SingleServerInventoryView serverInventoryView;
|
||||
private ScheduledExecutorFactory scheduledExecutorFactory;
|
||||
private DruidServer druidServer;
|
||||
private DataSegment segment;
|
||||
|
@ -58,7 +58,7 @@ public class DruidMasterTest
|
|||
segment = EasyMock.createNiceMock(DataSegment.class);
|
||||
loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class);
|
||||
loadManagementPeons = new MapMaker().makeMap();
|
||||
serverInventoryView = EasyMock.createMock(ServerInventoryView.class);
|
||||
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
|
||||
|
||||
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
|
||||
EasyMock.replay(databaseSegmentManager);
|
||||
|
|
Loading…
Reference in New Issue