Merge branch 'master' into worker-resource

Conflicts:
	pom.xml
This commit is contained in:
fjy 2013-07-25 16:18:42 -07:00
commit 217894d44a
57 changed files with 1942 additions and 456 deletions

View File

@ -10,8 +10,6 @@ SCRIPT_DIR=`pwd`
popd
VERSION=`cat pom.xml | grep version | head -4 | tail -1 | sed 's_.*<version>\([^<]*\)</version>.*_\1_'`
#TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz
#rm -f ${TAR_FILE}
echo Using Version[${VERSION}]

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>
@ -68,6 +68,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
@ -33,18 +34,20 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger;
@ -357,13 +360,29 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
serverInventoryView = new ServerInventoryView(
getConfigFactory().build(ServerInventoryViewConfig.class),
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
final ServerInventoryViewConfig serverInventoryViewConfig = getConfigFactory().build(ServerInventoryViewConfig.class);
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
if ("legacy".equalsIgnoreCase(announcerType)) {
serverInventoryView = new SingleServerInventoryView(
serverInventoryViewConfig,
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
} else if ("batch".equalsIgnoreCase(announcerType)) {
serverInventoryView = new BatchServerInventoryView(
serverInventoryViewConfig,
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
} else {
throw new IAE("Unknown type %s", announcerType);
}
lifecycle.addManagedInstance(serverInventoryView);
}
}
@ -373,18 +392,21 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
if (requestLogger == null) {
try {
final String loggingType = props.getProperty("druid.request.logging.type");
if("emitter".equals(loggingType)) {
setRequestLogger(Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()
));
}
else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
));
if ("emitter".equals(loggingType)) {
setRequestLogger(
Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()
)
);
} else if ("file".equalsIgnoreCase(loggingType)) {
setRequestLogger(
Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
)
);
} else {
setRequestLogger(new NoopRequestLogger());
}
@ -428,19 +450,39 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer);
setAnnouncer(
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchingCuratorDataSegmentAnnouncer(
getDruidServerMetadata(),
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
announcer,
getJsonMapper()
),
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
)
)
);
final ZkDataSegmentAnnouncerConfig config = getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class);
final String announcerType = config.getAnnouncerType();
final DataSegmentAnnouncer dataSegmentAnnouncer;
if ("batch".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
announcer,
getJsonMapper()
);
} else if ("legacy".equalsIgnoreCase(announcerType)) {
dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<AbstractDataSegmentAnnouncer>asList(
new BatchDataSegmentAnnouncer(
getDruidServerMetadata(),
config,
announcer,
getJsonMapper()
),
new SingleDataSegmentAnnouncer(
getDruidServerMetadata(),
getZkPaths(),
announcer,
getJsonMapper()
)
)
);
} else {
throw new ISE("Unknown announcer type [%s]", announcerType);
}
setAnnouncer(dataSegmentAnnouncer);
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
}

View File

@ -0,0 +1,129 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
/**
*/
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
final ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
public BatchServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
super(
config,
log,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getLiveSegmentsPath();
}
},
curator,
exec,
jsonMapper,
new TypeReference<Set<DataSegment>>()
{
}
);
}
@Override
protected DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final Set<DataSegment> inventory
)
{
zNodes.put(inventoryKey, inventory);
for (DataSegment segment : inventory) {
addSingleInventory(container, segment);
}
return container;
}
@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, Set<DataSegment> inventory
)
{
Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
}
for (DataSegment segment : Sets.difference(inventory, existing)) {
addSingleInventory(container, segment);
}
for (DataSegment segment : Sets.difference(existing, inventory)) {
removeSingleInventory(container, segment.getIdentifier());
}
zNodes.put(inventoryKey, inventory);
return container;
}
@Override
protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed container[%s]", container.getName(), inventoryKey);
Set<DataSegment> segments = zNodes.remove(inventoryKey);
if (segments == null) {
log.warn("Told to remove container[%s], which didn't exist", inventoryKey);
return container;
}
for (DataSegment segment : segments) {
removeSingleInventory(container, segment.getIdentifier());
}
return container;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.DruidServerMetadata;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -117,7 +118,8 @@ public class DruidServer implements Comparable
@JsonProperty
public Map<String, DataSegment> getSegments()
{
return ImmutableMap.copyOf(segments);
// Copying the map slows things down a lot here, don't use Immutable Map here
return Collections.unmodifiableMap(segments);
}
public DataSegment getSegment(String segmentName)

View File

@ -20,16 +20,17 @@
package com.metamx.druid.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
@ -43,44 +44,35 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public class ServerInventoryView implements ServerView, InventoryView
public abstract class ServerInventoryView<InventoryType> implements ServerView, InventoryView
{
private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
private final CuratorInventoryManager<DruidServer, DataSegment> inventoryManager;
private final ServerInventoryViewConfig config;
private final Logger log;
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
private static final Map<String, Integer> removedSegments = new MapMaker().makeMap();
private final Map<String, Integer> removedSegments = new MapMaker().makeMap();
public ServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final Logger log,
final InventoryManagerConfig inventoryManagerConfig,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
)
{
inventoryManager = new CuratorInventoryManager<DruidServer, DataSegment>(
this.config = config;
this.log = log;
this.inventoryManager = new CuratorInventoryManager<DruidServer, InventoryType>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
inventoryManagerConfig,
exec,
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>()
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
@ -105,10 +97,10 @@ public class ServerInventoryView implements ServerView, InventoryView
}
@Override
public DataSegment deserializeInventory(byte[] bytes)
public InventoryType deserializeInventory(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DataSegment.class);
return jsonMapper.readValue(bytes, typeReference);
}
catch (IOException e) {
throw Throwables.propagate(e);
@ -116,7 +108,7 @@ public class ServerInventoryView implements ServerView, InventoryView
}
@Override
public byte[] serializeInventory(DataSegment inventory)
public byte[] serializeInventory(InventoryType inventory)
{
try {
return jsonMapper.writeValueAsBytes(inventory);
@ -146,67 +138,27 @@ public class ServerInventoryView implements ServerView, InventoryView
}
@Override
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
return addInnerInventory(container, inventoryKey, inventory);
}
if (container.getSegment(inventoryKey) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
}
final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container, inventory);
}
}
);
return retVal;
@Override
public DruidServer updateInventory(
DruidServer container, String inventoryKey, InventoryType inventory
)
{
return updateInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
}
final DruidServer retVal = container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container, segment);
}
}
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
return retVal;
return removeInnerInventory(container, inventoryKey);
}
}
);
@ -282,7 +234,12 @@ public class ServerInventoryView implements ServerView, InventoryView
segmentCallbacks.put(callback, exec);
}
private void runSegmentCallbacks(
public InventoryManagerConfig getInventoryManagerConfig()
{
return inventoryManager.getConfig();
}
protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
@ -302,7 +259,7 @@ public class ServerInventoryView implements ServerView, InventoryView
}
}
private void runServerCallbacks(final DruidServer server)
protected void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
@ -319,4 +276,83 @@ public class ServerInventoryView implements ServerView, InventoryView
);
}
}
protected void addSingleInventory(
final DruidServer container,
final DataSegment inventory
)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
if (container.getSegment(inventory.getIdentifier()) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventory.getIdentifier(),
container.getName()
);
return;
}
container.addDataSegment(inventory.getIdentifier(), inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container, inventory);
}
}
);
}
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
{
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment == null) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return;
}
container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container, segment);
}
}
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
}
protected abstract DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer updateInnerInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
);
protected abstract DruidServer removeInnerInventory(
final DruidServer container,
String inventoryKey
);
}

View File

@ -29,4 +29,8 @@ public abstract class ServerInventoryViewConfig
@Config("druid.master.removedSegmentLifetime")
@Default("1")
public abstract int getRemovedSegmentLifetime();
@Config("druid.announcer.type")
@Default("legacy")
public abstract String getAnnouncerType();
}

View File

@ -0,0 +1,94 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
/**
*/
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
{
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
public SingleServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
super(
config,
log,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
curator,
exec,
jsonMapper,
new TypeReference<DataSegment>()
{
}
);
}
@Override
protected DruidServer addInnerInventory(
DruidServer container, String inventoryKey, DataSegment inventory
)
{
addSingleInventory(container, inventory);
return container;
}
@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, DataSegment inventory
)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey)
{
removeSingleInventory(container, inventoryKey);
return container;
}
}

View File

@ -86,7 +86,7 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc
return;
}
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
log.info("Stopping %s with config[%s]", getClass(), config);
announcer.unannounce(makeAnnouncementPath());
started = false;

View File

@ -22,33 +22,26 @@ package com.metamx.druid.coordination;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class);
private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
private final ZkDataSegmentAnnouncerConfig config;
private final Announcer announcer;
@ -58,7 +51,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
public BatchingCuratorDataSegmentAnnouncer(
public BatchDataSegmentAnnouncer(
DruidServerMetadata server,
ZkDataSegmentAnnouncerConfig config,
Announcer announcer,

View File

@ -28,15 +28,15 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class);
private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class);
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation;
public CuratorDataSegmentAnnouncer(
public SingleDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
Announcer announcer,

View File

@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -64,6 +65,7 @@ public class Announcer
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
private boolean started = false;
@ -114,6 +116,15 @@ public class Announcer
unannounce(ZKPaths.makePath(basePath, announcementPath));
}
}
for (String parent : parentsIBuilt) {
try {
curator.delete().forPath(parent);
}
catch (Exception e) {
log.info(e, "Unable to delete parent[%s], boooo.", parent);
}
}
}
}
@ -136,10 +147,19 @@ public class Announcer
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
boolean buildParentPath = false;
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null) {
try {
if (curator.checkExists().forPath(parentPath) == null) {
buildParentPath = true;
}
}
catch (Exception e) {
log.debug(e, "Problem checking if the parent existed, ignoring.");
}
// I don't have a watcher on this path yet, create a Map and start watching.
announcements.putIfAbsent(parentPath, new MapMaker().<String, byte[]>makeMap());
@ -208,17 +228,15 @@ public class Announcer
}
);
try {
synchronized (toAnnounce) {
if (started) {
cache.start();
listeners.put(parentPath, cache);
synchronized (toAnnounce) {
if (started) {
if (buildParentPath) {
createPath(parentPath);
}
startCache(cache);
listeners.put(parentPath, cache);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
@ -261,7 +279,7 @@ public class Announcer
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
}
synchronized (subPaths) {
synchronized (toAnnounce) {
try {
byte[] oldBytes = subPaths.get(nodePath);
@ -320,4 +338,26 @@ public class Announcer
throw Throwables.propagate(e);
}
}
private void startCache(PathChildrenCache cache)
{
try {
cache.start();
}
catch (Exception e) {
Closeables.closeQuietly(cache);
throw Throwables.propagate(e);
}
}
private void createPath(String parentPath)
{
try {
curator.create().creatingParentsIfNeeded().forPath(parentPath);
parentsIBuilt.add(parentPath);
}
catch (Exception e) {
log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
}
}
}

View File

@ -135,6 +135,11 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
}
public InventoryManagerConfig getConfig()
{
return config;
}
public ContainerClass getInventoryValue(String containerKey)
{
final ContainerHolder containerHolder = containers.get(containerKey);
@ -290,11 +295,18 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
final InventoryClass inventory = strategy.deserializeInventory(child.getData());
final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory));
holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
}
break;
case CHILD_UPDATED:
final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
}
break;

View File

@ -33,5 +33,6 @@ public interface CuratorInventoryManagerStrategy<ContainerClass, InventoryClass>
public void deadContainer(ContainerClass deadContainer);
public ContainerClass updateContainer(ContainerClass oldContainer, ContainerClass newContainer);
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
}

View File

@ -32,4 +32,8 @@ public abstract class CuratorConfig
@Config("druid.zk.service.sessionTimeoutMs")
@Default("30000")
public abstract int getZkSessionTimeoutMs();
@Config("druid.curator.compression.enable")
@Default("false")
public abstract boolean enableCompression();
}

View File

@ -72,13 +72,13 @@ public class Initialization
/**
* Load properties.
* Properties are layered:
*
* <p/>
* # stored in zookeeper
* # runtime.properties file,
* # cmdLine -D
*
* <p/>
* command line overrides runtime.properties which overrides zookeeper
*
* <p/>
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host is not set then do not load properties from zookeeper.
*
@ -196,10 +196,9 @@ public class Initialization
CuratorFrameworkFactory.builder()
.connectString(curatorConfig.getZkHosts())
.sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
// Don't compress stuff written just yet, need to get code deployed first.
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.compressionProvider(new PotentiallyGzippedCompressionProvider(curatorConfig.enableCompression()))
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
@ -335,9 +334,9 @@ public class Initialization
}
public static RequestLogger makeFileRequestLogger(
ObjectMapper objectMapper,
ScheduledExecutorFactory factory,
Properties props
ObjectMapper objectMapper,
ScheduledExecutorFactory factory,
Properties props
) throws IOException
{
return new FileRequestLogger(

View File

@ -14,4 +14,8 @@ public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
@Config("druid.zk.maxNumBytesPerNode")
@Default("512000")
public abstract long getMaxNumBytes();
@Config("druid.announcer.type")
@Default("legacy")
public abstract String getAnnouncerType();
}

View File

@ -0,0 +1,231 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
*/
public class BatchServerInventoryViewTest
{
private static final String testBasePath = "/test";
private static final Joiner joiner = Joiner.on("/");
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
private Announcer announcer;
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
private BatchServerInventoryView batchServerInventoryView;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(true))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(testBasePath);
jsonMapper = new DefaultObjectMapper();
announcer = new Announcer(
cf,
MoreExecutors.sameThreadExecutor()
);
announcer.start();
segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
"host",
Long.MAX_VALUE,
"type",
"tier"
),
new ZkDataSegmentAnnouncerConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
@Override
public int getSegmentsPerNode()
{
return 50;
}
@Override
public long getMaxNumBytes()
{
return 100000;
}
@Override
public String getAnnouncerType()
{
return "batch";
}
},
announcer,
jsonMapper
);
segmentAnnouncer.start();
testSegments = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
testSegments.add(makeSegment(i));
}
batchServerInventoryView = new BatchServerInventoryView(
new ServerInventoryViewConfig()
{
@Override
public int getRemovedSegmentLifetime()
{
return 0;
}
@Override
public String getAnnouncerType()
{
return "batch";
}
},
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return testBasePath;
}
},
cf,
Executors.newSingleThreadExecutor(),
jsonMapper
);
batchServerInventoryView.start();
}
@After
public void tearDown() throws Exception
{
batchServerInventoryView.stop();
segmentAnnouncer.stop();
announcer.stop();
cf.close();
testingCluster.stop();
}
@Test
public void testRun() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
waitForSync();
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
Assert.assertEquals(testSegments, segments);
DataSegment segment1 = makeSegment(101);
DataSegment segment2 = makeSegment(102);
segmentAnnouncer.announceSegment(segment1);
segmentAnnouncer.announceSegment(segment2);
testSegments.add(segment1);
testSegments.add(segment2);
waitForSync();
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
segmentAnnouncer.unannounceSegment(segment1);
segmentAnnouncer.unannounceSegment(segment2);
testSegments.remove(segment1);
testSegments.remove(segment2);
waitForSync();
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
}
private DataSegment makeSegment(int offset)
{
return DataSegment.builder()
.dataSource("foo")
.interval(
new Interval(
new DateTime("2013-01-01").plusDays(offset),
new DateTime("2013-01-02").plusDays(offset)
)
)
.version(new DateTime().toString())
.build();
}
private void waitForSync() throws Exception
{
Stopwatch stopwatch = new Stopwatch().start();
while (Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
Thread.sleep(500);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 5000) {
throw new ISE("BatchServerInventoryView is not updating");
}
}
}
}

View File

@ -47,7 +47,7 @@ import java.util.Set;
/**
*/
public class BatchingCuratorDataSegmentAnnouncerTest
public class BatchDataSegmentAnnouncerTest
{
private static final String testBasePath = "/test";
private static final String testSegmentsPath = "/test/segments/id";
@ -58,7 +58,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
private ObjectMapper jsonMapper;
private Announcer announcer;
private SegmentReader segmentReader;
private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer;
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
@Before
@ -84,7 +84,7 @@ public class BatchingCuratorDataSegmentAnnouncerTest
announcer.start();
segmentReader = new SegmentReader(cf, jsonMapper);
segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer(
segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
"host",
@ -111,6 +111,12 @@ public class BatchingCuratorDataSegmentAnnouncerTest
{
return 100000;
}
@Override
public String getAnnouncerType()
{
return "batch";
}
},
announcer,
jsonMapper

View File

@ -27,6 +27,8 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.test.KillSession;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -60,7 +62,6 @@ public class AnnouncerTest extends CuratorTestBase
public void testSanity() throws Exception
{
curator.start();
curator.create().forPath("/somewhere");
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
@ -163,4 +164,54 @@ public class AnnouncerTest extends CuratorTestBase
announcer.stop();
}
}
@Test
public void testCleansUpItsLittleTurdlings() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
announcer.start();
Assert.assertNull(curator.checkExists().forPath(parent));
announcer.announce(testPath, billy);
Assert.assertNotNull(curator.checkExists().forPath(parent));
announcer.stop();
Assert.assertNull(curator.checkExists().forPath(parent));
}
@Test
public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception
{
curator.start();
Announcer announcer = new Announcer(curator, exec);
final byte[] billy = "billy".getBytes();
final String testPath = "/somewhere/test2";
final String parent = ZKPaths.getPathAndNode(testPath).getPath();
curator.create().forPath(parent);
final Stat initialStat = curator.checkExists().forPath(parent);
announcer.start();
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.announce(testPath, billy);
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
announcer.stop();
Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid());
}
}

View File

@ -209,6 +209,14 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
return container;
}
@Override
public Map<String, Integer> updateInventory(
Map<String, Integer> container, String inventoryKey, Integer inventory
)
{
return addInventory(container, inventoryKey, inventory);
}
@Override
public Map<String, Integer> removeInventory(Map<String, Integer> container, String inventoryKey)
{

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,12 @@
{
"queryType": "groupBy",
"dataSource": "rabbitmqtest",
"granularity": "all",
"dimensions": [],
"aggregations": [
{ "type": "count", "name": "rows" },
{"type": "longSum", "name": "imps", "fieldName": "impressions"},
{"type": "doubleSum", "name": "wp", "fieldName": "wp"}
],
"intervals": ["2010-01-01T00:00/2020-01-01T00:00"]
}

View File

@ -0,0 +1,44 @@
[{
"schema" : {
"dataSource":"rabbitmqtest",
"aggregators":[
{"type":"count", "name":"impressions"},
{"type":"doubleSum","name":"wp","fieldName":"wp"}
],
"indexGranularity":"minute",
"shardSpec" : { "type": "none" }
},
"config" : {
"maxRowsInMemory" : 500000,
"intermediatePersistPeriod" : "PT1m"
},
"firehose" : {
"type" : "rabbitmq",
"connection" : {
"host": "localhost",
"username": "test-dude",
"password": "word-dude",
"virtualHost": "test-vhost"
},
"config" : {
"exchange": "test-exchange",
"queue" : "druidtest",
"routingKey": "#",
"durable": "true",
"exclusive": "false",
"autoDelete": "false"
},
"parser" : {
"timestampSpec" : { "column" : "utcdt", "format" : "iso" },
"data" : { "format" : "json" },
"dimensionExclusions" : ["wp"]
}
},
"plumber" : {
"type" : "realtime",
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "messageTime" }
}
}]

View File

@ -1,12 +1,27 @@
{
"queryType": "groupBy",
"dataSource": "webstream",
"granularity": "all",
"dimensions": ["country"],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
],
"filter": { "type": "selector", "dimension": "geo_region", "value": "CA" },
"intervals":["2012-10-01T00:00/2020-01-01T00"]
"queryType":"groupBy",
"dataSource":"webstream",
"granularity":"minute",
"dimensions":[
"timezone"
],
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"doubleSum",
"fieldName":"known_users",
"name":"known_users"
}
],
"filter":{
"type":"selector",
"dimension":"country",
"value":"US"
},
"intervals":[
"2013-06-01T00:00/2020-01-01T00"
]
}

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-examples</artifactId>
@ -10,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,182 @@
package druid.examples.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.cli.*;
import java.text.SimpleDateFormat;
import java.util.*;
/**
*
*/
public class RabbitMQProducerMain
{
public static void main(String[] args)
throws Exception
{
// We use a List to keep track of option insertion order. See below.
final List<Option> optionList = new ArrayList<Option>();
optionList.add(OptionBuilder.withLongOpt("help")
.withDescription("display this help message")
.create("h"));
optionList.add(OptionBuilder.withLongOpt("hostname")
.hasArg()
.withDescription("the hostname of the AMQP broker [defaults to AMQP library default]")
.create("b"));
optionList.add(OptionBuilder.withLongOpt("port")
.hasArg()
.withDescription("the port of the AMQP broker [defaults to AMQP library default]")
.create("n"));
optionList.add(OptionBuilder.withLongOpt("username")
.hasArg()
.withDescription("username to connect to the AMQP broker [defaults to AMQP library default]")
.create("u"));
optionList.add(OptionBuilder.withLongOpt("password")
.hasArg()
.withDescription("password to connect to the AMQP broker [defaults to AMQP library default]")
.create("p"));
optionList.add(OptionBuilder.withLongOpt("vhost")
.hasArg()
.withDescription("name of virtual host on the AMQP broker [defaults to AMQP library default]")
.create("v"));
optionList.add(OptionBuilder.withLongOpt("exchange")
.isRequired()
.hasArg()
.withDescription("name of the AMQP exchange [required - no default]")
.create("e"));
optionList.add(OptionBuilder.withLongOpt("key")
.hasArg()
.withDescription("the routing key to use when sending messages [default: 'default.routing.key']")
.create("k"));
optionList.add(OptionBuilder.withLongOpt("type")
.hasArg()
.withDescription("the type of exchange to create [default: 'topic']")
.create("t"));
optionList.add(OptionBuilder.withLongOpt("durable")
.withDescription("if set, a durable exchange will be declared [default: not set]")
.create("d"));
optionList.add(OptionBuilder.withLongOpt("autodelete")
.withDescription("if set, an auto-delete exchange will be declared [default: not set]")
.create("a"));
optionList.add(OptionBuilder.withLongOpt("single")
.withDescription("if set, only a single message will be sent [default: not set]")
.create("s"));
optionList.add(OptionBuilder.withLongOpt("start")
.hasArg()
.withDescription("time to use to start sending messages from [default: 2010-01-01T00:00:00]")
.create());
optionList.add(OptionBuilder.withLongOpt("stop")
.hasArg()
.withDescription("time to use to send messages until (format: '2013-07-18T23:45:59') [default: current time]")
.create());
optionList.add(OptionBuilder.withLongOpt("interval")
.hasArg()
.withDescription("the interval to add to the timestamp between messages in seconds [default: 10]")
.create());
optionList.add(OptionBuilder.withLongOpt("delay")
.hasArg()
.withDescription("the delay between sending messages in milliseconds [default: 100]")
.create());
// An extremely silly hack to maintain the above order in the help formatting.
HelpFormatter formatter = new HelpFormatter();
// Add a comparator to the HelpFormatter using the ArrayList above to sort by insertion order.
formatter.setOptionComparator(new Comparator(){
@Override
public int compare(Object o1, Object o2)
{
// I know this isn't fast, but who cares! The list is short.
return optionList.indexOf(o1) - optionList.indexOf(o2);
}
});
// Now we can add all the options to an Options instance. This is dumb!
Options options = new Options();
for (Option option : optionList) {
options.addOption(option);
}
CommandLine cmd = null;
try{
cmd = new BasicParser().parse(options, args);
}
catch(ParseException e){
formatter.printHelp("RabbitMQProducerMain", e.getMessage(), options, null);
System.exit(1);
}
if(cmd.hasOption("h")) {
formatter.printHelp("RabbitMQProducerMain", options);
System.exit(2);
}
ConnectionFactory factory = new ConnectionFactory();
if(cmd.hasOption("b")){
factory.setHost(cmd.getOptionValue("b"));
}
if(cmd.hasOption("u")){
factory.setUsername(cmd.getOptionValue("u"));
}
if(cmd.hasOption("p")){
factory.setPassword(cmd.getOptionValue("p"));
}
if(cmd.hasOption("v")){
factory.setVirtualHost(cmd.getOptionValue("v"));
}
if(cmd.hasOption("n")){
factory.setPort(Integer.parseInt(cmd.getOptionValue("n")));
}
String exchange = cmd.getOptionValue("e");
String routingKey = "default.routing.key";
if(cmd.hasOption("k")){
routingKey = cmd.getOptionValue("k");
}
boolean durable = cmd.hasOption("d");
boolean autoDelete = cmd.hasOption("a");
String type = cmd.getOptionValue("t", "topic");
boolean single = cmd.hasOption("single");
int interval = Integer.parseInt(cmd.getOptionValue("interval", "10"));
int delay = Integer.parseInt(cmd.getOptionValue("delay", "100"));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Date stop = sdf.parse(cmd.getOptionValue("stop", sdf.format(new Date())));
Random r = new Random();
Calendar timer = Calendar.getInstance();
timer.setTime(sdf.parse(cmd.getOptionValue("start", "2010-01-01T00:00:00")));
String msg_template = "{\"utcdt\": \"%s\", \"wp\": %d, \"gender\": \"%s\", \"age\": %d}";
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, type, durable, autoDelete, null);
do{
int wp = (10 + r.nextInt(90)) * 100;
String gender = r.nextBoolean() ? "male" : "female";
int age = 20 + r.nextInt(70);
String line = String.format(msg_template, sdf.format(timer.getTime()), wp, gender, age);
channel.basicPublish(exchange, routingKey, null, line.getBytes());
System.out.println("Sent message: " + line);
timer.add(Calendar.SECOND, interval);
Thread.sleep(delay);
}while((!single && stop.after(timer.getTime())));
connection.close();
}
}

View File

@ -21,11 +21,11 @@ package druid.examples.web;
import org.junit.Test;
import java.net.UnknownHostException;
import java.io.IOException;
public class WebJsonSupplierTest
{
@Test(expected = UnknownHostException.class)
@Test(expected = IOException.class)
public void checkInvalidUrl() throws Exception
{

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -19,15 +19,14 @@
package com.metamx.druid.common.s3;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.ServiceException;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.concurrent.Callable;
/**
*
@ -36,37 +35,6 @@ public class S3Utils
{
private static final Logger log = new Logger(S3Utils.class);
public static void putFileToS3(
File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
)
throws S3ServiceException, IOException, NoSuchAlgorithmException
{
S3Object s3Obj = new S3Object(localFile);
s3Obj.setBucketName(outputS3Bucket);
s3Obj.setKey(outputS3Path);
log.info("Uploading file[%s] to [s3://%s/%s]", localFile, s3Obj.getBucketName(), s3Obj.getKey());
s3Client.putObject(new S3Bucket(outputS3Bucket), s3Obj);
}
public static void putFileToS3WrapExceptions(
File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
)
{
try {
putFileToS3(localFile, s3Client, outputS3Bucket, outputS3Path);
}
catch (S3ServiceException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
public static void closeStreamsQuietly(S3Object s3Obj)
{
if (s3Obj == null) {
@ -80,4 +48,52 @@ public class S3Utils
}
}
/**
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
{
int nTry = 0;
final int maxTries = 3;
while (true) {
try {
nTry++;
return f.call();
}
catch (IOException e) {
if (nTry <= maxTries) {
awaitNextRetry(e, nTry);
} else {
throw Throwables.propagate(e);
}
}
catch (ServiceException e) {
if (nTry <= maxTries &&
(e.getCause() instanceof IOException ||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
awaitNextRetry(e, nTry);
} else {
throw e;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
{
final long baseSleepMillis = 1000;
final double fuzziness = 0.2;
final long sleepMillis = Math.max(
baseSleepMillis,
(long) (baseSleepMillis * Math.pow(2, nTry) *
(1 + new Random().nextGaussian() * fuzziness))
);
log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
}

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Floats;
import com.metamx.common.ISE;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
@ -138,6 +139,12 @@ public class SpatialDimensionRowFormatter
{
return row.getFloatMetric(metric);
}
@Override
public String toString()
{
return row.toString();
}
};
if (!spatialPartialDimNames.isEmpty()) {

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -65,7 +65,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.7.2</version>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>
@ -120,6 +120,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.1.1</version>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency>

View File

@ -60,13 +60,22 @@ public class DbSegmentPublisher implements SegmentPublisher
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable()
)
)
String statement;
if (!handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL")) {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable()
);
} else {
statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable()
);
}
handle.createStatement(statement)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())

View File

@ -27,6 +27,7 @@ import java.io.IOException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
@JsonSubTypes.Type(name = "rabbitmq", value = RabbitMQFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
})

View File

@ -0,0 +1,154 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
/**
* A Jacksonified version of the RabbitMQ ConnectionFactory for better integration
* into the realtime.spec configuration file format.
*/
public class JacksonifiedConnectionFactory extends ConnectionFactory
{
@Override
@JsonProperty
public String getHost()
{
return super.getHost();
}
@Override
public void setHost(String host)
{
super.setHost(host);
}
@Override
@JsonProperty
public int getPort()
{
return super.getPort();
}
@Override
public void setPort(int port)
{
super.setPort(port);
}
@Override
@JsonProperty
public String getUsername()
{
return super.getUsername();
}
@Override
public void setUsername(String username)
{
super.setUsername(username);
}
@Override
@JsonProperty
public String getPassword()
{
return super.getPassword();
}
@Override
public void setPassword(String password)
{
super.setPassword(password);
}
@Override
@JsonProperty
public String getVirtualHost()
{
return super.getVirtualHost();
}
@Override
public void setVirtualHost(String virtualHost)
{
super.setVirtualHost(virtualHost);
}
@Override
@JsonProperty
public void setUri(String uriString) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
{
super.setUri(uriString);
}
@Override
@JsonProperty
public int getRequestedChannelMax()
{
return super.getRequestedChannelMax();
}
@Override
public void setRequestedChannelMax(int requestedChannelMax)
{
super.setRequestedChannelMax(requestedChannelMax);
}
@Override
@JsonProperty
public int getRequestedFrameMax()
{
return super.getRequestedFrameMax();
}
@Override
public void setRequestedFrameMax(int requestedFrameMax)
{
super.setRequestedFrameMax(requestedFrameMax);
}
@Override
@JsonProperty
public int getRequestedHeartbeat()
{
return super.getRequestedHeartbeat();
}
@Override
public void setConnectionTimeout(int connectionTimeout)
{
super.setConnectionTimeout(connectionTimeout);
}
@Override
@JsonProperty
public int getConnectionTimeout()
{
return super.getConnectionTimeout();
}
@Override
public void setRequestedHeartbeat(int requestedHeartbeat)
{
super.setRequestedHeartbeat(requestedHeartbeat);
}
@Override
@JsonProperty
public Map<String, Object> getClientProperties()
{
return super.getClientProperties();
}
@Override
public void setClientProperties(Map<String, Object> clientProperties)
{
super.setClientProperties(clientProperties);
}
}

View File

@ -0,0 +1,82 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* A configuration object for a RabbitMQ connection.
*/
public class RabbitMQFirehoseConfig
{
private String queue = null;
private String exchange = null;
private String routingKey = null;
private boolean durable = false;
private boolean exclusive = false;
private boolean autoDelete = false;
@JsonProperty
public String getQueue()
{
return queue;
}
public void setQueue(String queue)
{
this.queue = queue;
}
@JsonProperty
public String getExchange()
{
return exchange;
}
public void setExchange(String exchange)
{
this.exchange = exchange;
}
@JsonProperty
public String getRoutingKey()
{
return routingKey;
}
public void setRoutingKey(String routingKey)
{
this.routingKey = routingKey;
}
@JsonProperty
public boolean isDurable()
{
return durable;
}
public void setDurable(boolean durable)
{
this.durable = durable;
}
@JsonProperty
public boolean isExclusive()
{
return exclusive;
}
public void setExclusive(boolean exclusive)
{
this.exclusive = exclusive;
}
@JsonProperty
public boolean isAutoDelete()
{
return autoDelete;
}
public void setAutoDelete(boolean autoDelete)
{
this.autoDelete = autoDelete;
}
}

View File

@ -0,0 +1,220 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
/**
* A FirehoseFactory for RabbitMQ.
* <p/>
* It will receive it's configuration through the realtime.spec file and expects to find a
* consumerProps element in the firehose definition with values for a number of configuration options.
* Below is a complete example for a RabbitMQ firehose configuration with some explanation. Options
* that have defaults can be skipped but options with no defaults must be specified with the exception
* of the URI property. If the URI property is set, it will override any other property that was also
* set.
* <p/>
* File: <em>realtime.spec</em>
* <pre>
* "firehose" : {
* "type" : "rabbitmq",
* "connection" : {
* "host": "localhost", # The hostname of the RabbitMQ broker to connect to. Default: 'localhost'
* "port": "5672", # The port number to connect to on the RabbitMQ broker. Default: '5672'
* "username": "test-dude", # The username to use to connect to RabbitMQ. Default: 'guest'
* "password": "test-word", # The password to use to connect to RabbitMQ. Default: 'guest'
* "virtualHost": "test-vhost", # The virtual host to connect to. Default: '/'
* "uri": "amqp://mqserver:1234/vhost", # The URI string to use to connect to RabbitMQ. No default and not needed
* },
* "config" : {
* "exchange": "test-exchange", # The exchange to connect to. No default
* "queue" : "druidtest", # The queue to connect to or create. No default
* "routingKey": "#", # The routing key to use to bind the queue to the exchange. No default
* "durable": "true", # Whether the queue should be durable. Default: 'false'
* "exclusive": "false", # Whether the queue should be exclusive. Default: 'false'
* "autoDelete": "false" # Whether the queue should auto-delete on disconnect. Default: 'false'
* },
* "parser" : {
* "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
* "data" : { "format" : "json" },
* "dimensionExclusions" : ["wp"]
* }
* },
* </pre>
* <p/>
* <b>Limitations:</b> This implementation will not attempt to reconnect to the MQ broker if the
* connection to it is lost. Furthermore it does not support any automatic failover on high availability
* RabbitMQ clusters. This is not supported by the underlying AMQP client library and while the behavior
* could be "faked" to some extent we haven't implemented that yet. However, if a policy is defined in
* the RabbitMQ cluster that sets the "ha-mode" and "ha-sync-mode" properly on the queue that this
* Firehose connects to, messages should survive an MQ broker node failure and be delivered once a
* connection to another node is set up.
* <p/>
* For more information on RabbitMQ high availability please see:
* <a href="http://www.rabbitmq.com/ha.html">http://www.rabbitmq.com/ha.html</a>.
*/
public class RabbitMQFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(RabbitMQFirehoseFactory.class);
@JsonProperty
private final RabbitMQFirehoseConfig config;
@JsonProperty
private final StringInputRowParser parser;
@JsonProperty
private final ConnectionFactory connectionFactory;
@JsonCreator
public RabbitMQFirehoseFactory(
@JsonProperty("connection") JacksonifiedConnectionFactory connectionFactory,
@JsonProperty("config") RabbitMQFirehoseConfig config,
@JsonProperty("parser") StringInputRowParser parser
)
{
this.connectionFactory = connectionFactory;
this.config = config;
this.parser = parser;
}
@Override
public Firehose connect() throws IOException
{
String queue = config.getQueue();
String exchange = config.getExchange();
String routingKey = config.getRoutingKey();
boolean durable = config.isDurable();
boolean exclusive = config.isExclusive();
boolean autoDelete = config.isAutoDelete();
final Connection connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener()
{
@Override
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Connection closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
final Channel channel = connection.createChannel();
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
channel.queueBind(queue, exchange, routingKey);
channel.addShutdownListener(new ShutdownListener()
{
@Override
public void shutdownCompleted(ShutdownSignalException cause)
{
log.warn(cause, "Channel closed!");
//FUTURE: we could try to re-establish the connection here. Not done in this version though.
}
});
// We create a QueueingConsumer that will not auto-acknowledge messages since that
// happens on commit().
final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, false, consumer);
return new Firehose()
{
/**
* Storing the latest delivery as a member variable should be safe since this will only be run
* by a single thread.
*/
private QueueingConsumer.Delivery delivery;
/**
* Store the latest delivery tag to be able to commit (acknowledge) the message delivery up to
* and including this tag. See commit() for more detail.
*/
private long lastDeliveryTag;
@Override
public boolean hasMore()
{
delivery = null;
try {
// Wait for the next delivery. This will block until something is available.
delivery = consumer.nextDelivery();
if (delivery != null) {
lastDeliveryTag = delivery.getEnvelope().getDeliveryTag();
// If delivery is non-null, we report that there is something more to process.
return true;
}
}
catch (InterruptedException e) {
// A little unclear on how we should handle this.
// At any rate, we're in an unknown state now so let's log something and return false.
log.wtf(e, "Got interrupted while waiting for next delivery. Doubt this should ever happen.");
}
// This means that delivery is null or we caught the exception above so we report that we have
// nothing more to process.
return false;
}
@Override
public InputRow nextRow()
{
if (delivery == null) {
//Just making sure.
log.wtf("I have nothing in delivery. Method hasMore() should have returned false.");
return null;
}
return parser.parse(new String(delivery.getBody()));
}
@Override
public Runnable commit()
{
// This method will be called from the same thread that calls the other methods of
// this Firehose. However, the returned Runnable will be called by a different thread.
//
// It should be (thread) safe to copy the lastDeliveryTag like we do below and then
// acknowledge values up to and including that value.
return new Runnable()
{
// Store (copy) the last delivery tag to "become" thread safe.
final long deliveryTag = lastDeliveryTag;
@Override
public void run()
{
try {
log.info("Acknowledging delivery of messages up to tag: " + deliveryTag);
// Acknowledge all messages up to and including the stored delivery tag.
channel.basicAck(deliveryTag, true);
}
catch (IOException e) {
log.error(e, "Unable to acknowledge message reception to message queue.");
}
}
};
}
@Override
public void close() throws IOException
{
log.info("Closing connection to RabbitMQ");
channel.close();
connection.close();
}
};
}
}

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -37,6 +38,8 @@ import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerView;
@ -50,6 +53,7 @@ import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.partition.SingleElementPartitionChunk;
import com.metamx.druid.query.MetricsEmittingQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
@ -186,6 +190,9 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile ScheduledExecutorService scheduledExecutor = null;
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
@Override
public void startJob()
@ -219,6 +226,7 @@ public class RealtimePlumberSchool implements PlumberSchool
try {
segmentAnnouncer.announceSegment(retVal.getSegment());
sinks.put(truncatedTime, retVal);
sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));
}
catch (IOException e) {
log.makeAlert(e, "Failed to announce new segment[%s]", schema.getDataSource())
@ -247,17 +255,23 @@ public class RealtimePlumberSchool implements PlumberSchool
}
};
List<TimelineObjectHolder<String, Sink>> querySinks = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
querySinks.addAll(sinkTimeline.lookup(interval));
}
return toolchest.mergeResults(
factory.mergeRunners(
EXEC,
FunctionalIterable
.create(sinks.values())
.create(querySinks)
.transform(
new Function<Sink, QueryRunner<T>>()
new Function<TimelineObjectHolder<String, Sink>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(Sink input)
public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
{
final Sink theSink = holder.getObject().getChunk(0).getObject();
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
@ -265,7 +279,7 @@ public class RealtimePlumberSchool implements PlumberSchool
factory.mergeRunners(
EXEC,
Iterables.transform(
input,
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
@ -279,9 +293,9 @@ public class RealtimePlumberSchool implements PlumberSchool
),
new SpecificSegmentSpec(
new SegmentDescriptor(
input.getInterval(),
input.getSegment().getVersion(),
input.getSegment().getShardSpec().getPartitionNum()
holder.getInterval(),
theSink.getSegment().getVersion(),
theSink.getSegment().getShardSpec().getPartitionNum()
)
)
);
@ -442,6 +456,11 @@ public class RealtimePlumberSchool implements PlumberSchool
Sink currSink = new Sink(sinkInterval, schema, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
currSink.getInterval(),
currSink.getVersion(),
new SingleElementPartitionChunk<Sink>(currSink)
);
segmentAnnouncer.announceSegment(currSink.getSegment());
}
@ -490,6 +509,11 @@ public class RealtimePlumberSchool implements PlumberSchool
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier());
sinks.remove(sinkKey);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<Sink>(sink)
);
synchronized (handoffCondition) {
handoffCondition.notifyAll();

View File

@ -90,6 +90,11 @@ public class Sink implements Iterable<FireHydrant>
makeNewCurrIndex(interval.getStartMillis(), schema);
}
public String getVersion()
{
return version;
}
public Interval getInterval()
{
return interval;

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -50,14 +50,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper;
private final ZkCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator;
private final ServerManager serverManager;
private final String loadQueueLocation;
private final String servedSegmentsLocation;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started;
@ -73,13 +71,11 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{
this.jsonMapper = jsonMapper;
this.config = config;
this.zkPaths = zkPaths;
this.me = me;
this.announcer = announcer;
this.curator = curator;
this.serverManager = serverManager;
this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
}
@LifecycleStart
@ -91,6 +87,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler
return;
}
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
@ -104,6 +104,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
if (config.isLoadFromSegmentCacheEnabled()) {
loadCache();

View File

@ -20,7 +20,6 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -33,7 +32,7 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -45,7 +44,6 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
@ -175,17 +173,16 @@ public class DatabaseSegmentManager
}
);
final List<DataSegment> segments = Lists.transform(
segmentTimeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01"))),
new Function<TimelineObjectHolder<String, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(@Nullable TimelineObjectHolder<String, DataSegment> input)
{
return input.getObject().getChunk(0).getObject();
}
}
);
final List<DataSegment> segments = Lists.newArrayList();
for (TimelineObjectHolder<String, DataSegment> objectHolder : segmentTimeline.lookup(
new Interval(
"0000-01-01/3000-01-01"
)
)) {
for (PartitionChunk<DataSegment> partitionChunk : objectHolder.getObject()) {
segments.add(partitionChunk.getObject());
}
}
if (segments.isEmpty()) {
log.warn("No segments found in the database!");
@ -451,4 +448,4 @@ public class DatabaseSegmentManager
log.error(e, "Problem polling DB.");
}
}
}
}

View File

@ -27,13 +27,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.IAE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
@ -131,9 +134,31 @@ public class MasterMain
final ExecutorService exec = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
ServerInventoryView serverInventoryView = new ServerInventoryView(
configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper
);
final ServerInventoryViewConfig serverInventoryViewConfig = configFactory.build(ServerInventoryViewConfig.class);
final String announcerType = serverInventoryViewConfig.getAnnouncerType();
final ServerInventoryView serverInventoryView;
if ("legacy".equalsIgnoreCase(announcerType)) {
serverInventoryView = new SingleServerInventoryView(
serverInventoryViewConfig,
zkPaths,
curatorFramework,
exec,
jsonMapper
);
} else if ("batch".equalsIgnoreCase(announcerType)) {
serverInventoryView = new BatchServerInventoryView(
serverInventoryViewConfig,
zkPaths,
curatorFramework,
exec,
jsonMapper
);
} else {
throw new IAE("Unknown type %s", announcerType);
}
lifecycle.addManagedInstance(serverInventoryView);
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);

View File

@ -19,6 +19,7 @@
package com.metamx.druid.loading;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
/**
@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{
S3Coords s3Coords = new S3Coords(segment);
final S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller
throw new ISE("outDir[%s] must be a directory.", outDir);
}
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
try {
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
S3Utils.retryS3Operation(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
InputStream in = null;
try {
in = s3Obj.getDataInputStream();
final String key = s3Obj.getKey();
if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir);
} else if (key.endsWith(".gz")) {
final File outFile = new File(outDir, toFilename(key, ".gz"));
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
} else {
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
}
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
}
catch (IOException e) {
FileUtils.deleteDirectory(outDir);
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
}
finally {
Closeables.closeQuietly(in);
}
try {
s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
InputStream in = null;
try {
in = s3Obj.getDataInputStream();
final String key = s3Obj.getKey();
if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir);
} else if (key.endsWith(".gz")) {
final File outFile = new File(outDir, toFilename(key, ".gz"));
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
} else {
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
}
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
return null;
}
catch (IOException e) {
FileUtils.deleteDirectory(outDir);
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
}
finally {
Closeables.closeQuietly(in);
}
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
}
}
}
);
}
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
}
}
private String toFilename(String key, final String suffix)
@ -124,25 +137,49 @@ public class S3DataSegmentPuller implements DataSegmentPuller
return filename;
}
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{
try {
return s3Client.isObjectInBucket(coords.bucket, coords.path);
return S3Utils.retryS3Operation(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return s3Client.isObjectInBucket(coords.bucket, coords.path);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
S3Coords coords = new S3Coords(segment);
final S3Coords coords = new S3Coords(segment);
try {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
final StorageObject objDetails = S3Utils.retryS3Operation(
new Callable<StorageObject>()
{
@Override
public StorageObject call() throws Exception
{
return s3Client.getObjectDetails(coords.bucket, coords.path);
}
}
);
return objDetails.getLastModifiedDate().getTime();
}
catch (S3ServiceException e) {
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}

View File

@ -80,7 +80,9 @@ public class S3DataSegmentPusher implements DataSegmentPusher
final String outputBucket = config.getBucket();
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
if (!config.getDisableAcl()) {
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
@ -96,7 +98,9 @@ public class S3DataSegmentPusher implements DataSegmentPusher
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
if (!config.getDisableAcl()) {
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);

View File

@ -32,4 +32,8 @@ public abstract class S3DataSegmentPusherConfig
@Config("druid.pusher.s3.baseKey")
@Default("")
public abstract String getBaseKey();
@Config("druid.pusher.s3.disableAcl")
@Default("false")
public abstract boolean getDisableAcl();
}

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
@ -84,7 +85,7 @@ public class DruidMaster
private final ZkPathsConfig zkPaths;
private final JacksonConfigManager configManager;
private final DatabaseSegmentManager databaseSegmentManager;
private final ServerInventoryView serverInventoryView;
private final ServerInventoryView<Object> serverInventoryView;
private final DatabaseRuleManager databaseRuleManager;
private final CuratorFramework curator;
private final ServiceEmitter emitter;
@ -247,144 +248,106 @@ public class DruidMaster
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
}
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IAE("Unable to find server [%s]", from);
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) {
throw new IAE("Unable to find server [%s]", to);
}
if (to.equalsIgnoreCase(from)) {
throw new IllegalArgumentException(
String.format("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to)
if (to.equalsIgnoreCase(from)) {
throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to);
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to);
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IAE(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
);
}
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), to), segmentName
);
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IllegalArgumentException(
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IllegalArgumentException(
String.format(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
)
);
}
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
final String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
);
loadPeon.loadSegment(
segment,
new LoadPeonCallback()
{
@Override
protected void execute()
loadPeon.loadSegment(
segment,
new LoadPeonCallback()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
} else if (callback != null) {
callback.execute();
@Override
protected void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
} else if (callback != null) {
callback.execute();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
public void cloneSegment(String from, String to, String segmentName, LoadPeonCallback callback)
{
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IllegalArgumentException(
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IllegalArgumentException(
String.format(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
)
);
}
if (!loadPeon.getSegmentsToLoad().contains(segment)) {
loadPeon.loadSegment(segment, callback);
catch (Exception e) {
log.makeAlert(e, "Exception moving segment %s", segmentName).emit();
callback.execute();
}
}
public void dropSegment(String from, String segmentName, final LoadPeonCallback callback)
{
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
}
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IAE("Unable to find server [%s]", from);
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IllegalArgumentException(
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
}
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) {
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
}
if (!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
if (!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception dropping segment %s", segmentName).emit();
callback.execute();
}
}
@ -543,7 +506,7 @@ public class DruidMaster
}
catch (Exception e) {
log.makeAlert(e, "Unable to become master")
.emit();
.emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch);
try {

View File

@ -84,19 +84,19 @@ public class ReplicationThrottler
}
}
public boolean canAddReplicant(String tier)
public boolean canCreateReplicant(String tier)
{
return replicatingLookup.get(tier);
return replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier);
}
public boolean canDestroyReplicant(String tier)
{
return terminatingLookup.get(tier);
return terminatingLookup.get(tier) && !currentlyTerminating.isAtMaxReplicants(tier);
}
public boolean registerReplicantCreation(String tier, String segmentId, String serverId)
public void registerReplicantCreation(String tier, String segmentId, String serverId)
{
return currentlyReplicating.addSegment(tier, segmentId, serverId);
currentlyReplicating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantCreation(String tier, String segmentId, String serverId)
@ -104,9 +104,9 @@ public class ReplicationThrottler
currentlyReplicating.removeSegment(tier, segmentId, serverId);
}
public boolean registerReplicantTermination(String tier, String segmentId, String serverId)
public void registerReplicantTermination(String tier, String segmentId, String serverId)
{
return currentlyTerminating.addSegment(tier, segmentId, serverId);
currentlyTerminating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
@ -119,19 +119,23 @@ public class ReplicationThrottler
private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();
private final Map<String, Integer> lifetimes = Maps.newHashMap();
public boolean addSegment(String tier, String segmentId, String serverId)
public boolean isAtMaxReplicants(String tier)
{
final ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
return (segments != null && segments.size() >= maxReplicants);
}
public void addSegment(String tier, String segmentId, String serverId)
{
ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
segments = new ConcurrentHashMap<String, String>();
currentlyProcessingSegments.put(tier, segments);
}
if (segments.size() < maxReplicants) {
segments.put(segmentId, serverId);
return true;
}
return false;
if (!isAtMaxReplicants(tier)) {
segments.put(segmentId, serverId);
}
}
public void removeSegment(String tier, String segmentId, String serverId)

View File

@ -92,6 +92,12 @@ public abstract class LoadRule implements Rule
final MasterStats stats = new MasterStats();
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
if (replicate && !replicationManager.canCreateReplicant(getTier())) {
break;
}
final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
if (holder == null) {
@ -104,15 +110,10 @@ public abstract class LoadRule implements Rule
break;
}
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
)) {
break;
}
if (replicate) {
replicationManager.registerReplicantCreation(
getTier(), segment.getIdentifier(), holder.getServer().getHost()
);
}
holder.getPeon().loadSegment(
@ -180,15 +181,16 @@ public abstract class LoadRule implements Rule
if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
)) {
if (!replicationManager.canDestroyReplicant(getTier())) {
serverQueue.add(holder);
break;
}
replicationManager.registerReplicantTermination(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
);
}
holder.getPeon().dropSegment(

View File

@ -296,6 +296,7 @@ public class DruidSetup
createPath(curator, zkPaths.getMasterPath(), out);
createPath(curator, zkPaths.getLoadQueuePath(), out);
createPath(curator, zkPaths.getServedSegmentsPath(), out);
createPath(curator, zkPaths.getLiveSegmentsPath(), out);
createPath(curator, zkPaths.getPropertiesPath(), out);
}

View File

@ -93,7 +93,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
}
};
announcer = new CuratorDataSegmentAnnouncer(
announcer = new SingleDataSegmentAnnouncer(
me, zkPaths, new Announcer(curator, Execs.singleThreaded("blah")), jsonMapper
);

View File

@ -23,7 +23,8 @@ import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.metrics.NoopServiceEmitter;
@ -44,7 +45,7 @@ public class DruidMasterTest
private CuratorFramework curator;
private LoadQueueTaskMaster taskMaster;
private DatabaseSegmentManager databaseSegmentManager;
private ServerInventoryView serverInventoryView;
private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
private DataSegment segment;
@ -58,7 +59,7 @@ public class DruidMasterTest
segment = EasyMock.createNiceMock(DataSegment.class);
loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class);
loadManagementPeons = new MapMaker().makeMap();
serverInventoryView = EasyMock.createMock(ServerInventoryView.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
EasyMock.replay(databaseSegmentManager);
@ -169,6 +170,20 @@ public class DruidMasterTest
EasyMock.expect(serverInventoryView.getInventoryValue("from")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryValue("to")).andReturn(druidServer);
EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return "";
}
@Override
public String getInventoryPath()
{
return "";
}
});
EasyMock.replay(serverInventoryView);
master.moveSegment("from", "to", "dummySegment", null);

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version>
<version>0.5.20-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -3,4 +3,4 @@
#
# Script to upload tarball of assembly build to static.druid.io for serving
#
s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/
s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/releases/