Merge pull request #194 from metamx/batch-zk

Changed method of publishing segment ownership to ZK to use batching, includes changes to allow for migration
This commit is contained in:
cheddar 2013-07-23 16:36:48 -07:00
commit 86dd1a499c
20 changed files with 762 additions and 153 deletions

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -33,14 +34,16 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.InventoryView; import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView; import com.metamx.druid.client.ServerView;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingDataSegmentAnnouncer; import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
@ -357,13 +360,29 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final ExecutorService exec = Executors.newFixedThreadPool( final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build() 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
); );
serverInventoryView = new ServerInventoryView(
getConfigFactory().build(ServerInventoryViewConfig.class), final ServerInventoryViewConfig serverInventoryViewConfig = getConfigFactory().build(ServerInventoryViewConfig.class);
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
if ("legacy".equalsIgnoreCase(announcerType)) {
serverInventoryView = new SingleServerInventoryView(
serverInventoryViewConfig,
getZkPaths(), getZkPaths(),
getCuratorFramework(), getCuratorFramework(),
exec, exec,
getJsonMapper() getJsonMapper()
); );
} else if ("batch".equalsIgnoreCase(announcerType)) {
serverInventoryView = new BatchServerInventoryView(
serverInventoryViewConfig,
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
} else {
throw new IAE("Unknown type %s", announcerType);
}
lifecycle.addManagedInstance(serverInventoryView); lifecycle.addManagedInstance(serverInventoryView);
} }
} }
@ -373,18 +392,21 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
if (requestLogger == null) { if (requestLogger == null) {
try { try {
final String loggingType = props.getProperty("druid.request.logging.type"); final String loggingType = props.getProperty("druid.request.logging.type");
if("emitter".equals(loggingType)) { if ("emitter".equals(loggingType)) {
setRequestLogger(Initialization.makeEmittingRequestLogger( setRequestLogger(
Initialization.makeEmittingRequestLogger(
getProps(), getProps(),
getEmitter() getEmitter()
)); )
} );
else if ("file".equalsIgnoreCase(loggingType)) { } else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(Initialization.makeFileRequestLogger( setRequestLogger(
Initialization.makeFileRequestLogger(
getJsonMapper(), getJsonMapper(),
getScheduledExecutorFactory(), getScheduledExecutorFactory(),
getProps() getProps()
)); )
);
} else { } else {
setRequestLogger(new NoopRequestLogger()); setRequestLogger(new NoopRequestLogger());
} }
@ -428,19 +450,39 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")); final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer); lifecycle.addManagedInstance(announcer);
setAnnouncer( final ZkDataSegmentAnnouncerConfig config = getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class);
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( final String announcerType = config.getAnnouncerType();
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingDataSegmentAnnouncer( final DataSegmentAnnouncer dataSegmentAnnouncer;
if ("batch".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new BatchDataSegmentAnnouncer(
getDruidServerMetadata(), getDruidServerMetadata(),
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class), config,
announcer,
getJsonMapper()
);
} else if ("legacy".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
announcer, announcer,
getJsonMapper() getJsonMapper()
), ),
new SingleDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) new SingleDataSegmentAnnouncer(
getDruidServerMetadata(),
getZkPaths(),
announcer,
getJsonMapper()
) )
) )
); );
} else {
throw new ISE("Unknown announcer type [%s]", announcerType);
}
setAnnouncer(dataSegmentAnnouncer);
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST); lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
} }

View File

@ -0,0 +1,129 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
/**
*/
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
final ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
public BatchServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
super(
config,
log,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getLiveSegmentsPath();
}
},
curator,
exec,
jsonMapper,
new TypeReference<Set<DataSegment>>()
{
}
);
}
@Override
protected DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final Set<DataSegment> inventory
)
{
zNodes.put(inventoryKey, inventory);
for (DataSegment segment : inventory) {
addSingleInventory(container, segment);
}
return container;
}
@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, Set<DataSegment> inventory
)
{
Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
}
for (DataSegment segment : Sets.difference(inventory, existing)) {
addSingleInventory(container, segment);
}
for (DataSegment segment : Sets.difference(existing, inventory)) {
removeSingleInventory(container, segment.getIdentifier());
}
zNodes.put(inventoryKey, inventory);
return container;
}
@Override
protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed container[%s]", container.getName(), inventoryKey);
Set<DataSegment> segments = zNodes.remove(inventoryKey);
if (segments == null) {
log.warn("Told to remove container[%s], which didn't exist", inventoryKey);
return container;
}
for (DataSegment segment : segments) {
removeSingleInventory(container, segment.getIdentifier());
}
return container;
}
}

View File

@ -20,16 +20,17 @@
package com.metamx.druid.client; package com.metamx.druid.client;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.inventory.CuratorInventoryManager; import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy; import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig; import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -43,44 +44,35 @@ import java.util.concurrent.atomic.AtomicBoolean;
/** /**
*/ */
public class ServerInventoryView implements ServerView, InventoryView public abstract class ServerInventoryView<InventoryType> implements ServerView, InventoryView
{ {
private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class); private final ServerInventoryViewConfig config;
private final Logger log;
private final CuratorInventoryManager<DruidServer, DataSegment> inventoryManager; private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap(); private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap(); private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
private static final Map<String, Integer> removedSegments = new MapMaker().makeMap(); private final Map<String, Integer> removedSegments = new MapMaker().makeMap();
public ServerInventoryView( public ServerInventoryView(
final ServerInventoryViewConfig config, final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths, final Logger log,
final InventoryManagerConfig inventoryManagerConfig,
final CuratorFramework curator, final CuratorFramework curator,
final ExecutorService exec, final ExecutorService exec,
final ObjectMapper jsonMapper final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
) )
{ {
inventoryManager = new CuratorInventoryManager<DruidServer, DataSegment>( this.config = config;
this.log = log;
this.inventoryManager = new CuratorInventoryManager<DruidServer, InventoryType>(
curator, curator,
new InventoryManagerConfig() inventoryManagerConfig,
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
exec, exec,
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>() new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{ {
@Override @Override
public DruidServer deserializeContainer(byte[] bytes) public DruidServer deserializeContainer(byte[] bytes)
@ -105,10 +97,10 @@ public class ServerInventoryView implements ServerView, InventoryView
} }
@Override @Override
public DataSegment deserializeInventory(byte[] bytes) public InventoryType deserializeInventory(byte[] bytes)
{ {
try { try {
return jsonMapper.readValue(bytes, DataSegment.class); return jsonMapper.readValue(bytes, typeReference);
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -116,7 +108,7 @@ public class ServerInventoryView implements ServerView, InventoryView
} }
@Override @Override
public byte[] serializeInventory(DataSegment inventory) public byte[] serializeInventory(InventoryType inventory)
{ {
try { try {
return jsonMapper.writeValueAsBytes(inventory); return jsonMapper.writeValueAsBytes(inventory);
@ -146,67 +138,27 @@ public class ServerInventoryView implements ServerView, InventoryView
} }
@Override @Override
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory) public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
)
{ {
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey); return addInnerInventory(container, inventoryKey, inventory);
if (container.getSegment(inventoryKey) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
} }
final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override @Override
public CallbackAction apply(SegmentCallback input) public DruidServer updateInventory(
DruidServer container, String inventoryKey, InventoryType inventory
)
{ {
return input.segmentAdded(container, inventory); return updateInnerInventory(container, inventoryKey, inventory);
}
}
);
return retVal;
} }
@Override @Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey) public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{ {
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey); return removeInnerInventory(container, inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
}
final DruidServer retVal = container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container, segment);
}
}
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
return retVal;
} }
} }
); );
@ -282,7 +234,12 @@ public class ServerInventoryView implements ServerView, InventoryView
segmentCallbacks.put(callback, exec); segmentCallbacks.put(callback, exec);
} }
private void runSegmentCallbacks( public InventoryManagerConfig getInventoryManagerConfig()
{
return inventoryManager.getConfig();
}
protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn final Function<SegmentCallback, CallbackAction> fn
) )
{ {
@ -302,7 +259,7 @@ public class ServerInventoryView implements ServerView, InventoryView
} }
} }
private void runServerCallbacks(final DruidServer server) protected void runServerCallbacks(final DruidServer server)
{ {
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) { for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute( entry.getValue().execute(
@ -319,4 +276,83 @@ public class ServerInventoryView implements ServerView, InventoryView
); );
} }
} }
protected void addSingleInventory(
final DruidServer container,
final DataSegment inventory
)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
if (container.getSegment(inventory.getIdentifier()) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventory.getIdentifier(),
container.getName()
);
return;
}
container.addDataSegment(inventory.getIdentifier(), inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container, inventory);
}
}
);
}
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return;
}
container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container, segment);
}
}
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
}
protected abstract DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer updateInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer removeInnerInventory(
final DruidServer container,
String inventoryKey
);
} }

View File

@ -29,4 +29,8 @@ public abstract class ServerInventoryViewConfig
@Config("druid.master.removedSegmentLifetime") @Config("druid.master.removedSegmentLifetime")
@Default("1") @Default("1")
public abstract int getRemovedSegmentLifetime(); public abstract int getRemovedSegmentLifetime();
@Config("druid.announcer.type")
@Default("legacy")
public abstract String getAnnouncerType();
} }

View File

@ -0,0 +1,94 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
/**
*/
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
{
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
public SingleServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
super(
config,
log,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
curator,
exec,
jsonMapper,
new TypeReference<DataSegment>()
{
}
);
}
@Override
protected DruidServer addInnerInventory(
DruidServer container, String inventoryKey, DataSegment inventory
)
{
addSingleInventory(container, inventory);
return container;
}
@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, DataSegment inventory
)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey)
{
removeSingleInventory(container, inventoryKey);
return container;
}
}

View File

@ -39,9 +39,9 @@ import java.util.Set;
/** /**
*/ */
public class BatchingDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
private static final Logger log = new Logger(BatchingDataSegmentAnnouncer.class); private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config; private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer; private final Announcer announcer;
@ -51,7 +51,7 @@ public class BatchingDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet(); private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap(); private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingDataSegmentAnnouncer( public BatchDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config, ZkDataSegmentAnnouncerConfig config,
Announcer announcer, Announcer announcer,

View File

@ -135,6 +135,11 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
} }
} }
public InventoryManagerConfig getConfig()
{
return config;
}
public ContainerClass getInventoryValue(String containerKey) public ContainerClass getInventoryValue(String containerKey)
{ {
final ContainerHolder containerHolder = containers.get(containerKey); final ContainerHolder containerHolder = containers.get(containerKey);
@ -290,11 +295,18 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
switch (event.getType()) { switch (event.getType()) {
case CHILD_ADDED: case CHILD_ADDED:
case CHILD_UPDATED: final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
final InventoryClass inventory = strategy.deserializeInventory(child.getData());
synchronized (holder) { synchronized (holder) {
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory)); holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
}
break;
case CHILD_UPDATED:
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
} }
break; break;

View File

@ -33,5 +33,6 @@ public interface CuratorInventoryManagerStrategy<ContainerClass, InventoryClass>
public void deadContainer(ContainerClass deadContainer); public void deadContainer(ContainerClass deadContainer);
public ContainerClass updateContainer(ContainerClass oldContainer, ContainerClass newContainer); public ContainerClass updateContainer(ContainerClass oldContainer, ContainerClass newContainer);
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass removeInventory(ContainerClass container, String inventoryKey); public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
} }

View File

@ -32,4 +32,8 @@ public abstract class CuratorConfig
@Config("druid.zk.service.sessionTimeoutMs") @Config("druid.zk.service.sessionTimeoutMs")
@Default("30000") @Default("30000")
public abstract int getZkSessionTimeoutMs(); public abstract int getZkSessionTimeoutMs();
@Config("druid.curator.compression.enable")
@Default("false")
public abstract boolean enableCompression();
} }

View File

@ -72,13 +72,13 @@ public class Initialization
/** /**
* Load properties. * Load properties.
* Properties are layered: * Properties are layered:
* * <p/>
* # stored in zookeeper * # stored in zookeeper
* # runtime.properties file, * # runtime.properties file,
* # cmdLine -D * # cmdLine -D
* * <p/>
* command line overrides runtime.properties which overrides zookeeper * command line overrides runtime.properties which overrides zookeeper
* * <p/>
* Idempotent. Thread-safe. Properties are only loaded once. * Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host is not set then do not load properties from zookeeper. * If property druid.zk.service.host is not set then do not load properties from zookeeper.
* *
@ -197,8 +197,7 @@ public class Initialization
.connectString(curatorConfig.getZkHosts()) .connectString(curatorConfig.getZkHosts())
.sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs()) .sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
// Don't compress stuff written just yet, need to get code deployed first. .compressionProvider(new PotentiallyGzippedCompressionProvider(curatorConfig.enableCompression()))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build(); .build();
lifecycle.addHandler( lifecycle.addHandler(

View File

@ -14,4 +14,8 @@ public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
@Config("druid.zk.maxNumBytesPerNode") @Config("druid.zk.maxNumBytesPerNode")
@Default("512000") @Default("512000")
public abstract long getMaxNumBytes(); public abstract long getMaxNumBytes();
@Config("druid.announcer.type")
@Default("legacy")
public abstract String getAnnouncerType();
} }

View File

@ -0,0 +1,231 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
*/
public class BatchServerInventoryViewTest
{
private static final String testBasePath = "/test";
private static final Joiner joiner = Joiner.on("/");
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
private Announcer announcer;
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
private BatchServerInventoryView batchServerInventoryView;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(true))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(testBasePath);
jsonMapper = new DefaultObjectMapper();
announcer = new Announcer(
cf,
MoreExecutors.sameThreadExecutor()
);
announcer.start();
segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
"host",
Long.MAX_VALUE,
"type",
"tier"
),
new ZkDataSegmentAnnouncerConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
@Override
public int getSegmentsPerNode()
{
return 50;
}
@Override
public long getMaxNumBytes()
{
return 100000;
}
@Override
public String getAnnouncerType()
{
return "batch";
}
},
announcer,
jsonMapper
);
segmentAnnouncer.start();
testSegments = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
testSegments.add(makeSegment(i));
}
batchServerInventoryView = new BatchServerInventoryView(
new ServerInventoryViewConfig()
{
@Override
public int getRemovedSegmentLifetime()
{
return 0;
}
@Override
public String getAnnouncerType()
{
return "batch";
}
},
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
},
cf,
Executors.newSingleThreadExecutor(),
jsonMapper
);
batchServerInventoryView.start();
}
@After
public void tearDown() throws Exception
{
batchServerInventoryView.stop();
segmentAnnouncer.stop();
announcer.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testRun() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
waitForSync();
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
Assert.assertEquals(testSegments, segments);
DataSegment segment1 = makeSegment(101);
DataSegment segment2 = makeSegment(102);
segmentAnnouncer.announceSegment(segment1);
segmentAnnouncer.announceSegment(segment2);
testSegments.add(segment1);
testSegments.add(segment2);
waitForSync();
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
segmentAnnouncer.unannounceSegment(segment1);
segmentAnnouncer.unannounceSegment(segment2);
testSegments.remove(segment1);
testSegments.remove(segment2);
waitForSync();
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
}
private DataSegment makeSegment(int offset)
{
return DataSegment.builder()
.dataSource("foo")
.interval(
new Interval(
new DateTime("2013-01-01").plusDays(offset),
new DateTime("2013-01-02").plusDays(offset)
)
)
.version(new DateTime().toString())
.build();
}
private void waitForSync() throws Exception
{
Stopwatch stopwatch = new Stopwatch().start();
while (Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
Thread.sleep(500);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 5000) {
throw new ISE("BatchServerInventoryView is not updating");
}
}
}
}

View File

@ -47,7 +47,7 @@ import java.util.Set;
/** /**
*/ */
public class BatchingDataSegmentAnnouncerTest public class BatchDataSegmentAnnouncerTest
{ {
private static final String testBasePath = "/test"; private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id"; private static final String testSegmentsPath = "/test/segments/id";
@ -58,7 +58,7 @@ public class BatchingDataSegmentAnnouncerTest
private ObjectMapper jsonMapper; private ObjectMapper jsonMapper;
private Announcer announcer; private Announcer announcer;
private SegmentReader segmentReader; private SegmentReader segmentReader;
private BatchingDataSegmentAnnouncer segmentAnnouncer; private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments; private Set<DataSegment> testSegments;
@Before @Before
@ -84,7 +84,7 @@ public class BatchingDataSegmentAnnouncerTest
announcer.start(); announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper); segmentReader = new SegmentReader(cf, jsonMapper);
segmentAnnouncer = new BatchingDataSegmentAnnouncer( segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata( new DruidServerMetadata(
"id", "id",
"host", "host",
@ -111,6 +111,12 @@ public class BatchingDataSegmentAnnouncerTest
{ {
return 100000; return 100000;
} }
@Override
public String getAnnouncerType()
{
return "batch";
}
}, },
announcer, announcer,
jsonMapper jsonMapper

View File

@ -209,6 +209,14 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
return container; return container;
} }
@Override
public Map<String, Integer> updateInventory(
Map<String, Integer> container, String inventoryKey, Integer inventory
)
{
return addInventory(container, inventoryKey, inventory);
}
@Override @Override
public Map<String, Integer> removeInventory(Map<String, Integer> container, String inventoryKey) public Map<String, Integer> removeInventory(Map<String, Integer> container, String inventoryKey)
{ {

View File

@ -143,10 +143,7 @@ public class SpatialDimensionRowFormatter
@Override @Override
public String toString() public String toString()
{ {
return "InputRow{" + return row.toString();
"timestamp=" + row.getTimestampFromEpoch() +
", dimensions=" + row.getDimensions() +
'}';
} }
}; };

View File

@ -50,14 +50,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ZkCoordinatorConfig config; private final ZkCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me; private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator; private final CuratorFramework curator;
private final ServerManager serverManager; private final ServerManager serverManager;
private final String loadQueueLocation;
private final String servedSegmentsLocation;
private volatile PathChildrenCache loadQueueCache; private volatile PathChildrenCache loadQueueCache;
private volatile boolean started; private volatile boolean started;
@ -73,13 +71,11 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.config = config; this.config = config;
this.zkPaths = zkPaths;
this.me = me; this.me = me;
this.announcer = announcer; this.announcer = announcer;
this.curator = curator; this.curator = curator;
this.serverManager = serverManager; this.serverManager = serverManager;
this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
} }
@LifecycleStart @LifecycleStart
@ -91,6 +87,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler
return; return;
} }
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache( loadQueueCache = new PathChildrenCache(
curator, curator,
loadQueueLocation, loadQueueLocation,
@ -104,6 +104,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
if (config.isLoadFromSegmentCacheEnabled()) { if (config.isLoadFromSegmentCacheEnabled()) {
loadCache(); loadCache();

View File

@ -27,13 +27,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter; import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.IAE;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config; import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManager;
@ -131,9 +134,31 @@ public class MasterMain
final ExecutorService exec = Executors.newFixedThreadPool( final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build() 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
); );
ServerInventoryView serverInventoryView = new ServerInventoryView(
configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper final ServerInventoryViewConfig serverInventoryViewConfig = configFactory.build(ServerInventoryViewConfig.class);
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
final ServerInventoryView serverInventoryView;
if ("legacy".equalsIgnoreCase(announcerType)) {
serverInventoryView = new SingleServerInventoryView(
serverInventoryViewConfig,
zkPaths,
curatorFramework,
exec,
jsonMapper
); );
} else if ("batch".equalsIgnoreCase(announcerType)) {
serverInventoryView = new BatchServerInventoryView(
serverInventoryViewConfig,
zkPaths,
curatorFramework,
exec,
jsonMapper
);
} else {
throw new IAE("Unknown type %s", announcerType);
}
lifecycle.addManagedInstance(serverInventoryView); lifecycle.addManagedInstance(serverInventoryView);
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);

View File

@ -85,7 +85,7 @@ public class DruidMaster
private final ZkPathsConfig zkPaths; private final ZkPathsConfig zkPaths;
private final JacksonConfigManager configManager; private final JacksonConfigManager configManager;
private final DatabaseSegmentManager databaseSegmentManager; private final DatabaseSegmentManager databaseSegmentManager;
private final ServerInventoryView serverInventoryView; private final ServerInventoryView<Object> serverInventoryView;
private final DatabaseRuleManager databaseRuleManager; private final DatabaseRuleManager databaseRuleManager;
private final CuratorFramework curator; private final CuratorFramework curator;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
@ -291,7 +291,7 @@ public class DruidMaster
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toServedSegPath = ZKPaths.makePath( final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), to), segmentName
); );
loadPeon.loadSegment( loadPeon.loadSegment(

View File

@ -296,6 +296,7 @@ public class DruidSetup
createPath(curator, zkPaths.getMasterPath(), out); createPath(curator, zkPaths.getMasterPath(), out);
createPath(curator, zkPaths.getLoadQueuePath(), out); createPath(curator, zkPaths.getLoadQueuePath(), out);
createPath(curator, zkPaths.getServedSegmentsPath(), out); createPath(curator, zkPaths.getServedSegmentsPath(), out);
createPath(curator, zkPaths.getLiveSegmentsPath(), out);
createPath(curator, zkPaths.getPropertiesPath(), out); createPath(curator, zkPaths.getPropertiesPath(), out);
} }

View File

@ -23,7 +23,8 @@ import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.metrics.NoopServiceEmitter; import com.metamx.druid.metrics.NoopServiceEmitter;
@ -44,7 +45,7 @@ public class DruidMasterTest
private CuratorFramework curator; private CuratorFramework curator;
private LoadQueueTaskMaster taskMaster; private LoadQueueTaskMaster taskMaster;
private DatabaseSegmentManager databaseSegmentManager; private DatabaseSegmentManager databaseSegmentManager;
private ServerInventoryView serverInventoryView; private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory; private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer; private DruidServer druidServer;
private DataSegment segment; private DataSegment segment;
@ -58,7 +59,7 @@ public class DruidMasterTest
segment = EasyMock.createNiceMock(DataSegment.class); segment = EasyMock.createNiceMock(DataSegment.class);
loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class); loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class);
loadManagementPeons = new MapMaker().makeMap(); loadManagementPeons = new MapMaker().makeMap();
serverInventoryView = EasyMock.createMock(ServerInventoryView.class); serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
EasyMock.replay(databaseSegmentManager); EasyMock.replay(databaseSegmentManager);
@ -169,6 +170,20 @@ public class DruidMasterTest
EasyMock.expect(serverInventoryView.getInventoryValue("from")).andReturn(druidServer); EasyMock.expect(serverInventoryView.getInventoryValue("from")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryValue("to")).andReturn(druidServer); EasyMock.expect(serverInventoryView.getInventoryValue("to")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
@Override
public String getInventoryPath()
{
return "";
}
});
EasyMock.replay(serverInventoryView); EasyMock.replay(serverInventoryView);
master.moveSegment("from", "to", "dummySegment", null); master.moveSegment("from", "to", "dummySegment", null);