mirror of https://github.com/apache/druid.git
Merge remote-tracking branch 'upstream/master' into rabbitmq
This commit is contained in:
commit
d2f8e52752
|
@ -13,3 +13,4 @@ target
|
|||
examples/rand/RealtimeNode.out
|
||||
examples/twitter/RealtimeNode.out
|
||||
*.log
|
||||
*.DS_Store
|
||||
|
|
|
@ -39,15 +39,19 @@ import com.metamx.druid.client.ServerInventoryView;
|
|||
import com.metamx.druid.client.ServerInventoryViewConfig;
|
||||
import com.metamx.druid.client.ServerView;
|
||||
import com.metamx.druid.concurrent.Execs;
|
||||
import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.BatchingCuratorDataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.DruidServerMetadata;
|
||||
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.http.NoopRequestLogger;
|
||||
import com.metamx.druid.http.RequestLogger;
|
||||
import com.metamx.druid.initialization.CuratorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -424,7 +428,20 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
|
|||
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
|
||||
lifecycle.addManagedInstance(announcer);
|
||||
|
||||
setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()));
|
||||
setAnnouncer(
|
||||
new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
|
||||
Arrays.<AbstractDataSegmentAnnouncer>asList(
|
||||
new BatchingCuratorDataSegmentAnnouncer(
|
||||
getDruidServerMetadata(),
|
||||
getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class),
|
||||
announcer,
|
||||
getJsonMapper()
|
||||
),
|
||||
new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer
|
||||
{
|
||||
private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class);
|
||||
|
||||
private final DruidServerMetadata server;
|
||||
private final ZkPathsConfig config;
|
||||
private final Announcer announcer;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
protected AbstractDataSegmentAnnouncer(
|
||||
DruidServerMetadata server,
|
||||
ZkPathsConfig config,
|
||||
Announcer announcer,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.server = server;
|
||||
this.config = config;
|
||||
this.announcer = announcer;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
final String path = makeAnnouncementPath();
|
||||
log.info("Announcing self[%s] at [%s]", server, path);
|
||||
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
|
||||
announcer.unannounce(makeAnnouncementPath());
|
||||
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
private String makeAnnouncementPath()
|
||||
{
|
||||
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
||||
{
|
||||
private static final Logger log = new Logger(BatchingCuratorDataSegmentAnnouncer.class);
|
||||
|
||||
private final ZkDataSegmentAnnouncerConfig config;
|
||||
private final Announcer announcer;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final String liveSegmentLocation;
|
||||
|
||||
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
|
||||
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
|
||||
|
||||
public BatchingCuratorDataSegmentAnnouncer(
|
||||
DruidServerMetadata server,
|
||||
ZkDataSegmentAnnouncerConfig config,
|
||||
Announcer announcer,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(server, config, announcer, jsonMapper);
|
||||
|
||||
this.config = config;
|
||||
this.announcer = announcer;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
|
||||
if (newBytesLen > config.getMaxNumBytes()) {
|
||||
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
|
||||
}
|
||||
|
||||
// create new batch
|
||||
if (availableZNodes.isEmpty()) {
|
||||
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
availableZNode.addSegment(segment);
|
||||
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
|
||||
segmentLookup.put(segment, availableZNode);
|
||||
availableZNodes.add(availableZNode);
|
||||
} else { // update existing batch
|
||||
Iterator<SegmentZNode> iter = availableZNodes.iterator();
|
||||
boolean done = false;
|
||||
while (iter.hasNext() && !done) {
|
||||
SegmentZNode availableZNode = iter.next();
|
||||
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
|
||||
availableZNode.addSegment(segment);
|
||||
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
|
||||
segmentLookup.put(segment, availableZNode);
|
||||
|
||||
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
|
||||
availableZNodes.remove(availableZNode);
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
|
||||
segmentZNode.removeSegment(segment);
|
||||
|
||||
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
|
||||
if (segmentZNode.getCount() == 0) {
|
||||
availableZNodes.remove(segmentZNode);
|
||||
announcer.unannounce(segmentZNode.getPath());
|
||||
} else {
|
||||
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
|
||||
availableZNodes.add(segmentZNode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
Set<DataSegment> batch = Sets.newHashSet();
|
||||
int byteSize = 0;
|
||||
int count = 0;
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
|
||||
|
||||
if (newBytesLen > config.getMaxNumBytes()) {
|
||||
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
|
||||
}
|
||||
|
||||
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
|
||||
segmentZNode.addSegments(batch);
|
||||
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
|
||||
|
||||
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
batch = Sets.newHashSet();
|
||||
count = 0;
|
||||
byteSize = 0;
|
||||
}
|
||||
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
|
||||
segmentLookup.put(segment, segmentZNode);
|
||||
batch.add(segment);
|
||||
count++;
|
||||
byteSize += newBytesLen;
|
||||
}
|
||||
|
||||
segmentZNode.addSegments(batch);
|
||||
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
for (DataSegment segment : segments) {
|
||||
unannounceSegment(segment);
|
||||
}
|
||||
}
|
||||
|
||||
private String makeServedSegmentPath(String zNode)
|
||||
{
|
||||
return ZKPaths.makePath(liveSegmentLocation, zNode);
|
||||
}
|
||||
|
||||
private class SegmentZNode
|
||||
{
|
||||
private final String path;
|
||||
|
||||
private byte[] bytes = new byte[]{};
|
||||
private int count = 0;
|
||||
|
||||
public SegmentZNode(String path)
|
||||
{
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public String getPath()
|
||||
{
|
||||
return path;
|
||||
}
|
||||
|
||||
public int getCount()
|
||||
{
|
||||
return count;
|
||||
}
|
||||
|
||||
public byte[] getBytes()
|
||||
{
|
||||
return bytes;
|
||||
}
|
||||
|
||||
public Set<DataSegment> getSegments()
|
||||
{
|
||||
if (bytes.length == 0) {
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
try {
|
||||
return jsonMapper.readValue(
|
||||
bytes, new TypeReference<Set<DataSegment>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void addSegment(DataSegment segment)
|
||||
{
|
||||
Set<DataSegment> zkSegments = getSegments();
|
||||
zkSegments.add(segment);
|
||||
|
||||
try {
|
||||
bytes = jsonMapper.writeValueAsBytes(zkSegments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
zkSegments.remove(segment);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
count++;
|
||||
}
|
||||
|
||||
public void addSegments(Set<DataSegment> segments)
|
||||
{
|
||||
Set<DataSegment> zkSegments = getSegments();
|
||||
zkSegments.addAll(segments);
|
||||
|
||||
try {
|
||||
bytes = jsonMapper.writeValueAsBytes(zkSegments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
zkSegments.removeAll(segments);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
count += segments.size();
|
||||
}
|
||||
|
||||
public void removeSegment(DataSegment segment)
|
||||
{
|
||||
Set<DataSegment> zkSegments = getSegments();
|
||||
zkSegments.remove(segment);
|
||||
|
||||
try {
|
||||
bytes = jsonMapper.writeValueAsBytes(zkSegments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
zkSegments.add(segment);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
count--;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SegmentZNode that = (SegmentZNode) o;
|
||||
|
||||
if (!path.equals(that.path)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return path.hashCode();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,11 +19,7 @@
|
|||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
|
@ -32,20 +28,14 @@ import org.apache.curator.utils.ZKPaths;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
|
||||
public class CuratorDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
||||
{
|
||||
private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final DruidServerMetadata server;
|
||||
private final ZkPathsConfig config;
|
||||
private final Announcer announcer;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final String servedSegmentsLocation;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
public CuratorDataSegmentAnnouncer(
|
||||
DruidServerMetadata server,
|
||||
ZkPathsConfig config,
|
||||
|
@ -53,49 +43,13 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
|
|||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.server = server;
|
||||
this.config = config;
|
||||
super(server, config, announcer, jsonMapper);
|
||||
|
||||
this.announcer = announcer;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
final String path = makeAnnouncementPath();
|
||||
log.info("Announcing self[%s] at [%s]", server, path);
|
||||
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
|
||||
announcer.unannounce(makeAnnouncementPath());
|
||||
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void announceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
final String path = makeServedSegmentPath(segment);
|
||||
|
@ -110,8 +64,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
|
|||
announcer.unannounce(path);
|
||||
}
|
||||
|
||||
private String makeAnnouncementPath() {
|
||||
return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
|
||||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
for (DataSegment segment : segments) {
|
||||
announceSegment(segment);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
for (DataSegment segment : segments) {
|
||||
unannounceSegment(segment);
|
||||
}
|
||||
}
|
||||
|
||||
private String makeServedSegmentPath(DataSegment segment)
|
||||
|
|
|
@ -1,3 +1,22 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
|
@ -7,5 +26,10 @@ import java.io.IOException;
|
|||
public interface DataSegmentAnnouncer
|
||||
{
|
||||
public void announceSegment(DataSegment segment) throws IOException;
|
||||
|
||||
public void unannounceSegment(DataSegment segment) throws IOException;
|
||||
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException;
|
||||
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public class DruidServerMetadata
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "DruidServer{" +
|
||||
return "DruidServerMetadata{" +
|
||||
"name='" + name + '\'' +
|
||||
", host='" + host + '\'' +
|
||||
", maxSize=" + maxSize +
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This class has the greatest name ever
|
||||
*/
|
||||
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
|
||||
{
|
||||
private final Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers;
|
||||
|
||||
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
|
||||
Iterable<AbstractDataSegmentAnnouncer> dataSegmentAnnouncers
|
||||
)
|
||||
{
|
||||
this.dataSegmentAnnouncers = dataSegmentAnnouncers;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
||||
dataSegmentAnnouncer.start();
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
||||
dataSegmentAnnouncer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
||||
dataSegmentAnnouncer.announceSegment(segment);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
||||
dataSegmentAnnouncer.unannounceSegment(segment);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
||||
dataSegmentAnnouncer.announceSegments(segments);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
||||
dataSegmentAnnouncer.unannounceSegments(segments);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,3 +1,22 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.curator.announcement;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -6,6 +25,7 @@ import com.google.common.collect.MapMaker;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
|
@ -21,7 +41,9 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
|||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -99,7 +121,7 @@ public class Announcer
|
|||
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
|
||||
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
|
||||
*
|
||||
* @param path The path to announce at
|
||||
* @param path The path to announce at
|
||||
* @param bytes The payload to announce
|
||||
*/
|
||||
public void announce(String path, byte[] bytes)
|
||||
|
@ -127,7 +149,7 @@ public class Announcer
|
|||
|
||||
// Synchronize to make sure that I only create a listener once.
|
||||
synchronized (finalSubPaths) {
|
||||
if (! listeners.containsKey(parentPath)) {
|
||||
if (!listeners.containsKey(parentPath)) {
|
||||
final PathChildrenCache cache = factory.make(curator, parentPath);
|
||||
cache.getListenable().addListener(
|
||||
new PathChildrenCacheListener()
|
||||
|
@ -208,11 +230,11 @@ public class Announcer
|
|||
if (started) {
|
||||
byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes);
|
||||
|
||||
if (oldBytes != null) {
|
||||
throw new IAE("Already announcing[%s], cannot announce it twice.", path);
|
||||
if (oldBytes == null) {
|
||||
created = true;
|
||||
} else if (!Arrays.equals(oldBytes, bytes)) {
|
||||
throw new IAE("Cannot reannounce different values under the same path");
|
||||
}
|
||||
|
||||
created = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,15 +248,48 @@ public class Announcer
|
|||
}
|
||||
}
|
||||
|
||||
public void update(final String path, final byte[] bytes)
|
||||
{
|
||||
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
|
||||
|
||||
final String parentPath = pathAndNode.getPath();
|
||||
final String nodePath = pathAndNode.getNode();
|
||||
|
||||
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
|
||||
|
||||
if (subPaths == null || subPaths.get(nodePath) == null) {
|
||||
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
|
||||
}
|
||||
|
||||
synchronized (subPaths) {
|
||||
try {
|
||||
byte[] oldBytes = subPaths.get(nodePath);
|
||||
|
||||
if (!Arrays.equals(oldBytes, bytes)) {
|
||||
subPaths.put(nodePath, bytes);
|
||||
updateAnnouncement(path, bytes);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String createAnnouncement(final String path, byte[] value) throws Exception
|
||||
{
|
||||
return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value);
|
||||
}
|
||||
|
||||
private Stat updateAnnouncement(final String path, final byte[] value) throws Exception
|
||||
{
|
||||
return curator.setData().compressed().inBackground().forPath(path, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
|
||||
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
|
||||
*
|
||||
* <p/>
|
||||
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
|
||||
*
|
||||
* @param path
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package com.metamx.druid.initialization;
|
||||
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
|
||||
{
|
||||
@Config("druid.zk.segmentsPerNode")
|
||||
@Default("50")
|
||||
public abstract int getSegmentsPerNode();
|
||||
|
||||
@Config("druid.zk.maxNumBytesPerNode")
|
||||
@Default("512000")
|
||||
public abstract long getMaxNumBytes();
|
||||
}
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.initialization;
|
|||
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
public abstract class ZkPathsConfig
|
||||
{
|
||||
|
@ -45,6 +46,12 @@ public abstract class ZkPathsConfig
|
|||
return defaultPath("servedSegments");
|
||||
}
|
||||
|
||||
@Config("druid.zk.paths.liveSegmentsPath")
|
||||
public String getLiveSegmentsPath()
|
||||
{
|
||||
return defaultPath("segments");
|
||||
}
|
||||
|
||||
@Config("druid.zk.paths.loadQueuePath")
|
||||
public String getLoadQueuePath()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BatchingCuratorDataSegmentAnnouncerTest
|
||||
{
|
||||
private static final String testBasePath = "/test";
|
||||
private static final String testSegmentsPath = "/test/segments/id";
|
||||
private static final Joiner joiner = Joiner.on("/");
|
||||
|
||||
private TestingCluster testingCluster;
|
||||
private CuratorFramework cf;
|
||||
private ObjectMapper jsonMapper;
|
||||
private Announcer announcer;
|
||||
private SegmentReader segmentReader;
|
||||
private BatchingCuratorDataSegmentAnnouncer segmentAnnouncer;
|
||||
private Set<DataSegment> testSegments;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
testingCluster = new TestingCluster(1);
|
||||
testingCluster.start();
|
||||
|
||||
cf = CuratorFrameworkFactory.builder()
|
||||
.connectString(testingCluster.getConnectString())
|
||||
.retryPolicy(new ExponentialBackoffRetry(1, 10))
|
||||
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
|
||||
.build();
|
||||
cf.start();
|
||||
cf.create().creatingParentsIfNeeded().forPath(testBasePath);
|
||||
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
announcer = new Announcer(
|
||||
cf,
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
);
|
||||
announcer.start();
|
||||
|
||||
segmentReader = new SegmentReader(cf, jsonMapper);
|
||||
segmentAnnouncer = new BatchingCuratorDataSegmentAnnouncer(
|
||||
new DruidServerMetadata(
|
||||
"id",
|
||||
"host",
|
||||
Long.MAX_VALUE,
|
||||
"type",
|
||||
"tier"
|
||||
),
|
||||
new ZkDataSegmentAnnouncerConfig()
|
||||
{
|
||||
@Override
|
||||
public String getZkBasePath()
|
||||
{
|
||||
return testBasePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSegmentsPerNode()
|
||||
{
|
||||
return 50;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxNumBytes()
|
||||
{
|
||||
return 100000;
|
||||
}
|
||||
},
|
||||
announcer,
|
||||
jsonMapper
|
||||
);
|
||||
segmentAnnouncer.start();
|
||||
|
||||
testSegments = Sets.newHashSet();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
testSegments.add(makeSegment(i));
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
segmentAnnouncer.stop();
|
||||
announcer.stop();
|
||||
cf.close();
|
||||
testingCluster.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleAnnounce() throws Exception
|
||||
{
|
||||
Iterator<DataSegment> segIter = testSegments.iterator();
|
||||
DataSegment firstSegment = segIter.next();
|
||||
DataSegment secondSegment = segIter.next();
|
||||
|
||||
segmentAnnouncer.announceSegment(firstSegment);
|
||||
|
||||
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
|
||||
|
||||
for (String zNode : zNodes) {
|
||||
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
|
||||
Assert.assertEquals(segments.iterator().next(), firstSegment);
|
||||
}
|
||||
|
||||
segmentAnnouncer.announceSegment(secondSegment);
|
||||
|
||||
for (String zNode : zNodes) {
|
||||
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
|
||||
Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments);
|
||||
}
|
||||
|
||||
segmentAnnouncer.unannounceSegment(firstSegment);
|
||||
|
||||
for (String zNode : zNodes) {
|
||||
Set<DataSegment> segments = segmentReader.read(joiner.join(testSegmentsPath, zNode));
|
||||
Assert.assertEquals(segments.iterator().next(), secondSegment);
|
||||
}
|
||||
|
||||
segmentAnnouncer.unannounceSegment(secondSegment);
|
||||
|
||||
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchAnnounce() throws Exception
|
||||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
|
||||
|
||||
Assert.assertTrue(zNodes.size() == 2);
|
||||
|
||||
Set<DataSegment> allSegments = Sets.newHashSet();
|
||||
for (String zNode : zNodes) {
|
||||
allSegments.addAll(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
|
||||
}
|
||||
Assert.assertEquals(allSegments, testSegments);
|
||||
|
||||
segmentAnnouncer.unannounceSegments(testSegments);
|
||||
|
||||
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleBatchAnnounce() throws Exception
|
||||
{
|
||||
for (int i = 0; i < 10; i++) {
|
||||
testBatchAnnounce();
|
||||
}
|
||||
}
|
||||
|
||||
private DataSegment makeSegment(int offset)
|
||||
{
|
||||
return DataSegment.builder()
|
||||
.dataSource("foo")
|
||||
.interval(
|
||||
new Interval(
|
||||
new DateTime("2013-01-01").plusDays(offset),
|
||||
new DateTime("2013-01-02").plusDays(offset)
|
||||
)
|
||||
)
|
||||
.version(new DateTime().toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
private class SegmentReader
|
||||
{
|
||||
private final CuratorFramework cf;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper)
|
||||
{
|
||||
this.cf = cf;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
public Set<DataSegment> read(String path)
|
||||
{
|
||||
try {
|
||||
if (cf.checkExists().forPath(path) != null) {
|
||||
return jsonMapper.readValue(
|
||||
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -43,14 +43,11 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
|
|||
curator.start();
|
||||
manager.start();
|
||||
|
||||
curator.create().creatingParentsIfNeeded().forPath("/container");
|
||||
curator.create().creatingParentsIfNeeded().forPath("/inventory/billy");
|
||||
|
||||
Assert.assertTrue(Iterables.isEmpty(manager.getInventory()));
|
||||
|
||||
CountDownLatch containerLatch = new CountDownLatch(1);
|
||||
strategy.setNewContainerLatch(containerLatch);
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{});
|
||||
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/container/billy", new byte[]{});
|
||||
|
||||
Assert.assertTrue(timing.awaitLatch(containerLatch));
|
||||
strategy.setNewContainerLatch(null);
|
||||
|
@ -60,7 +57,7 @@ public class CuratorInventoryManagerTest extends CuratorTestBase
|
|||
|
||||
CountDownLatch inventoryLatch = new CountDownLatch(2);
|
||||
strategy.setNewInventoryLatch(inventoryLatch);
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100));
|
||||
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/1", Ints.toByteArray(100));
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/inventory/billy/bob", Ints.toByteArray(2287));
|
||||
|
||||
Assert.assertTrue(timing.awaitLatch(inventoryLatch));
|
||||
|
|
|
@ -34,4 +34,10 @@ public class Runnables
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Runnable getNoopRunnable(){
|
||||
return new Runnable(){
|
||||
public void run(){}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
# Setup Oracle Java
|
||||
sudo apt-get update
|
||||
sudo add-apt-repository -y ppa:webupd8team/java
|
||||
sudo apt-get update
|
||||
|
||||
# Setup yes answer to license question
|
||||
echo debconf shared/accepted-oracle-license-v1-1 select true | sudo debconf-set-selections
|
||||
echo debconf shared/accepted-oracle-license-v1-1 seen true | sudo debconf-set-selections
|
||||
sudo apt-get -y -q install oracle-java7-installer
|
||||
|
||||
# Automated Kafka setup
|
||||
curl http://static.druid.io/artifacts/kafka-0.7.2-incubating-bin.tar.gz -o /tmp/kafka-0.7.2-incubating-bin.tar.gz
|
||||
tar -xvzf /tmp/kafka-0.7.2-incubating-bin.tar.gz
|
||||
cd kafka-0.7.2-incubating-bin
|
||||
cat config/zookeeper.properties
|
||||
nohup bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 > /dev/null &
|
||||
# in a new console
|
||||
nohup bin/kafka-server-start.sh config/server.properties 2>&1 > /dev/null &
|
||||
|
||||
# Install dependencies - mysql must be built from source, as the 12.04 apt-get hangs
|
||||
export DEBIAN_FRONTEND=noninteractive
|
||||
sudo debconf-set-selections <<< 'mysql-server-5.5 mysql-server/root_password password diurd'
|
||||
sudo debconf-set-selections <<< 'mysql-server-5.5 mysql-server/root_password_again password diurd'
|
||||
sudo apt-get -q -y -V --force-yes --reinstall install mysql-server-5.5
|
||||
|
||||
echo "ALL DONE with druid environment setup! Hit CTRL-C to proceed."
|
||||
exit 0
|
|
@ -0,0 +1,22 @@
|
|||
# Is localhost expected with multi-node?
|
||||
mysql -u root -pdiurd -e "GRANT ALL ON druid.* TO 'druid'@'localhost' IDENTIFIED BY 'diurd'; CREATE database druid;" 2>&1 > /dev/null
|
||||
|
||||
tar -xvzf druid-services-*-bin.tar.gz 2>&1 > /dev/null
|
||||
cd druid-services-* 2>&1 > /dev/null
|
||||
|
||||
mkdir logs 2>&1 > /dev/null
|
||||
|
||||
# Now start a realtime node
|
||||
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/realtime.spec -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/realtime com.metamx.druid.realtime.RealtimeMain 2>&1 > logs/realtime.log &
|
||||
|
||||
# And a master node
|
||||
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/master com.metamx.druid.http.MasterMain 2>&1 > logs/master.log &
|
||||
|
||||
# And a compute node
|
||||
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/compute com.metamx.druid.http.ComputeMain 2>&1 > logs/compute.log &
|
||||
|
||||
# And a broker node
|
||||
nohup java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/druid-services-0.5.5-SNAPSHOT-selfcontained.jar:config/broker com.metamx.druid.http.BrokerMain 2>&1 > logs/broker.log &
|
||||
|
||||
echo "Hit CTRL-C to continue..."
|
||||
exit 0
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "webstream",
|
||||
"granularity": "all",
|
||||
"dimensions": ["country"],
|
||||
"aggregations":[
|
||||
{ "type": "count", "name": "rows"},
|
||||
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
|
||||
],
|
||||
"filter": { "type": "selector", "dimension": "geo_region", "value": "CA" },
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
[{
|
||||
"schema": {
|
||||
"dataSource": "webstream",
|
||||
"aggregators": [
|
||||
{"type": "count", "name": "rows"},
|
||||
{"type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
|
||||
],
|
||||
"indexGranularity": "minute",
|
||||
"shardSpec": {"type": "none"}
|
||||
},
|
||||
|
||||
"config": {
|
||||
"maxRowsInMemory": 50000,
|
||||
"intermediatePersistPeriod": "PT2m"
|
||||
},
|
||||
|
||||
"firehose": {
|
||||
"type": "webstream",
|
||||
"url":"http://developer.usa.gov/1usagov",
|
||||
"renamedDimensions": {
|
||||
"g":"bitly_hash",
|
||||
"c":"country",
|
||||
"a":"user",
|
||||
"cy":"city",
|
||||
"l":"encoding_user_login",
|
||||
"hh":"short_url",
|
||||
"hc":"timestamp_hash",
|
||||
"h":"user_bitly_hash",
|
||||
"u":"url",
|
||||
"tz":"timezone",
|
||||
"t":"time",
|
||||
"r":"referring_url",
|
||||
"gr":"geo_region",
|
||||
"nk":"known_users",
|
||||
"al":"accept_language"
|
||||
},
|
||||
"timeDimension":"t",
|
||||
"timeFormat":"posix"
|
||||
},
|
||||
|
||||
"plumber": {
|
||||
"type": "realtime",
|
||||
"windowPeriod": "PT3m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "/tmp/example/usagov_realtime/basePersist"
|
||||
}
|
||||
}]
|
|
@ -0,0 +1,76 @@
|
|||
# Before running, you will need to download the EC2 tools from http://aws.amazon.com/developertools/351
|
||||
# and then setup your EC2_HOME and PATH variables (or similar):
|
||||
#
|
||||
# # Setup environment for ec2-api-tools
|
||||
# export EC2_HOME=/path/to/ec2-api-tools-1.6.7.4/
|
||||
# export PATH=$PATH:$EC2_HOME/bin
|
||||
# export AWS_ACCESS_KEY=
|
||||
# export AWS_SECRET_KEY=
|
||||
|
||||
# Check for ec2 commands we require and die if they're missing
|
||||
type ec2-create-keypair >/dev/null 2>&1 || { echo >&2 "I require ec2-create-keypair but it's not installed. Aborting."; exit 1; }
|
||||
type ec2-create-group >/dev/null 2>&1 || { echo >&2 "I require ec2-create-group but it's not installed. Aborting."; exit 1; }
|
||||
type ec2-authorize >/dev/null 2>&1 || { echo >&2 "I require ec2-authorize but it's not installed. Aborting."; exit 1; }
|
||||
type ec2-run-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-run-instances but it's not installed. Aborting."; exit 1; }
|
||||
type ec2-describe-instances >/dev/null 2>&1 || { echo >&2 "I require ec2-describe-instances but it's not installed. Aborting."; exit 1; }
|
||||
|
||||
# Create a keypair for our servers
|
||||
echo "Removing old keypair for druid..."
|
||||
ec2-delete-keypair druid-keypair
|
||||
echo "Creating new keypair for druid..."
|
||||
ec2-create-keypair druid-keypair > druid-keypair
|
||||
chmod 0600 druid-keypair
|
||||
mv druid-keypair ~/.ssh/
|
||||
|
||||
# Create a security group for our servers
|
||||
echo "Creating a new security group for druid..."
|
||||
ec2-create-group druid-group -d "Druid Cluster"
|
||||
|
||||
# Create rules that allow necessary services in our group
|
||||
echo "Creating new firewall rules for druid..."
|
||||
# SSH from outside
|
||||
ec2-authorize druid-group -P tcp -p 22
|
||||
# Enable all traffic within group
|
||||
ec2-authorize druid-group -P tcp -p 1-65535 -o druid-group
|
||||
ec2-authorize druid-group -P udp -p 1-65535 -o druid-group
|
||||
|
||||
echo "Booting a single small instance for druid..."
|
||||
# Use ami ami-e7582d8e - Alestic Ubuntu 12.04 us-east
|
||||
INSTANCE_ID=$(ec2-run-instances ami-e7582d8e -n 1 -g druid-group -k druid-keypair --instance-type m1.small| awk '/INSTANCE/{print $2}')
|
||||
while true; do
|
||||
sleep 1
|
||||
INSTANCE_STATUS=$(ec2-describe-instances|grep INSTANCE|grep $INSTANCE_ID|cut -f6)
|
||||
if [ $INSTANCE_STATUS == "running" ]
|
||||
then
|
||||
echo "Instance $INSTANCE_ID is status $INSTANCE_STATUS..."
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
# Wait for the instance to come up
|
||||
echo "Waiting 60 seconds for instance $INSTANCE_ID to boot..."
|
||||
sleep 60
|
||||
|
||||
# Get hostname and ssh with the key we created, and ssh there
|
||||
INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4`
|
||||
echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..."
|
||||
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}:
|
||||
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
|
||||
|
||||
echo "Prepared $INSTANCE_ADDRESS for druid."
|
||||
|
||||
# Now to scp a tarball up that can run druid!
|
||||
if [ -f ../../services/target/druid-services-*-SNAPSHOT-bin.tar.gz ];
|
||||
then
|
||||
echo "Uploading druid tarball to server..."
|
||||
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ../../services/target/druid-services-*-bin.tar.gz ubuntu@${INSTANCE_ADDRESS}:
|
||||
else
|
||||
echo "ERROR - package not built!"
|
||||
fi
|
||||
|
||||
# Now boot druid parts
|
||||
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}:
|
||||
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
|
||||
|
||||
echo "Druid booting complete!"
|
||||
echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect"
|
|
@ -0,0 +1,43 @@
|
|||
druid.host=127.0.0.1
|
||||
druid.port=8083
|
||||
|
||||
com.metamx.emitter.logging=true
|
||||
|
||||
druid.processing.formatString=processing_%s
|
||||
druid.processing.numThreads=1
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
|
||||
#emitting, opaque marker
|
||||
druid.service=example
|
||||
|
||||
druid.request.logging.dir=/tmp/example/log
|
||||
druid.realtime.specFile=realtime.spec
|
||||
com.metamx.emitter.logging=true
|
||||
com.metamx.emitter.logging.level=info
|
||||
|
||||
# below are dummy values when operating a realtime only node
|
||||
com.metamx.aws.accessKey=dummy_access_key
|
||||
com.metamx.aws.secretKey=dummy_secret_key
|
||||
druid.pusher.s3.bucket=dummy_s3_bucket
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
druid.server.maxSize=300000000000
|
||||
druid.zk.paths.base=/druid
|
||||
druid.database.segmentTable=prod_segments
|
||||
druid.database.user=druid
|
||||
druid.database.password=diurd
|
||||
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.zk.paths.discoveryPath=/druid/discoveryPath
|
||||
druid.database.ruleTable=rules
|
||||
druid.database.configTable=config
|
||||
|
||||
# Path on local FS for storage of segments; dir will be created if needed
|
||||
druid.paths.indexCache=/tmp/druid/indexCache
|
||||
# Path on local FS for storage of segment metadata; dir will be created if needed
|
||||
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
|
||||
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
|
||||
druid.pusher.local=true
|
||||
|
||||
# thread pool size for servicing queries
|
||||
druid.client.http.connections=30
|
||||
druid.host=127.0.0.1:8083
|
|
@ -0,0 +1,39 @@
|
|||
druid.host=127.0.0.1
|
||||
druid.port=8082
|
||||
|
||||
com.metamx.emitter.logging=true
|
||||
|
||||
druid.processing.formatString=processing_%s
|
||||
druid.processing.numThreads=1
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
|
||||
#emitting, opaque marker
|
||||
druid.service=example
|
||||
|
||||
druid.request.logging.dir=/tmp/example/log
|
||||
druid.realtime.specFile=realtime.spec
|
||||
com.metamx.emitter.logging=true
|
||||
com.metamx.emitter.logging.level=info
|
||||
|
||||
# below are dummy values when operating a realtime only node
|
||||
com.metamx.aws.accessKey=dummy_access_key
|
||||
com.metamx.aws.secretKey=dummy_secret_key
|
||||
druid.pusher.s3.bucket=dummy_s3_bucket
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
druid.server.maxSize=300000000000
|
||||
druid.zk.paths.base=/druid
|
||||
druid.database.segmentTable=prod_segments
|
||||
druid.database.user=druid
|
||||
druid.database.password=diurd
|
||||
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.zk.paths.discoveryPath=/druid/discoveryPath
|
||||
druid.database.ruleTable=rules
|
||||
druid.database.configTable=config
|
||||
|
||||
# Path on local FS for storage of segments; dir will be created if needed
|
||||
druid.paths.indexCache=/tmp/druid/indexCache
|
||||
# Path on local FS for storage of segment metadata; dir will be created if needed
|
||||
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
|
||||
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
|
||||
druid.pusher.local=true
|
|
@ -0,0 +1,39 @@
|
|||
druid.host=127.0.0.1
|
||||
druid.port=8081
|
||||
|
||||
com.metamx.emitter.logging=true
|
||||
|
||||
druid.processing.formatString=processing_%s
|
||||
druid.processing.numThreads=1
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
|
||||
#emitting, opaque marker
|
||||
druid.service=example
|
||||
|
||||
druid.request.logging.dir=/tmp/example/log
|
||||
druid.realtime.specFile=realtime.spec
|
||||
com.metamx.emitter.logging=true
|
||||
com.metamx.emitter.logging.level=info
|
||||
|
||||
# below are dummy values when operating a realtime only node
|
||||
com.metamx.aws.accessKey=dummy_access_key
|
||||
com.metamx.aws.secretKey=dummy_secret_key
|
||||
druid.pusher.s3.bucket=dummy_s3_bucket
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
druid.server.maxSize=300000000000
|
||||
druid.zk.paths.base=/druid
|
||||
druid.database.segmentTable=prod_segments
|
||||
druid.database.user=druid
|
||||
druid.database.password=diurd
|
||||
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.zk.paths.discoveryPath=/druid/discoveryPath
|
||||
druid.database.ruleTable=rules
|
||||
druid.database.configTable=config
|
||||
|
||||
# Path on local FS for storage of segments; dir will be created if needed
|
||||
druid.paths.indexCache=/tmp/druid/indexCache
|
||||
# Path on local FS for storage of segment metadata; dir will be created if needed
|
||||
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
|
||||
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
|
||||
druid.pusher.local=true
|
|
@ -0,0 +1,29 @@
|
|||
[{
|
||||
"schema" : { "dataSource":"druidtest",
|
||||
"aggregators":[ {"type":"count", "name":"impressions"},
|
||||
{"type":"doubleSum","name":"wp","fieldName":"wp"}],
|
||||
"indexGranularity":"minute",
|
||||
"shardSpec" : { "type": "none" } },
|
||||
"config" : { "maxRowsInMemory" : 500000,
|
||||
"intermediatePersistPeriod" : "PT10m" },
|
||||
"firehose" : { "type" : "kafka-0.7.2",
|
||||
"consumerProps" : { "zk.connect" : "localhost:2181",
|
||||
"zk.connectiontimeout.ms" : "15000",
|
||||
"zk.sessiontimeout.ms" : "15000",
|
||||
"zk.synctime.ms" : "5000",
|
||||
"groupid" : "topic-pixel-local",
|
||||
"fetch.size" : "1048586",
|
||||
"autooffset.reset" : "largest",
|
||||
"autocommit.enable" : "false" },
|
||||
"feed" : "druidtest",
|
||||
"parser" : { "timestampSpec" : { "column" : "utcdt", "format" : "iso" },
|
||||
"data" : { "format" : "json" },
|
||||
"dimensionExclusions" : ["wp"] } },
|
||||
"plumber" : { "type" : "realtime",
|
||||
"windowPeriod" : "PT10m",
|
||||
"segmentGranularity":"hour",
|
||||
"basePersistDirectory" : "/tmp/realtime/basePersist",
|
||||
"rejectionPolicy": {"type": "messageTime"} }
|
||||
|
||||
}]
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
druid.host=127.0.0.1
|
||||
druid.port=8080
|
||||
|
||||
com.metamx.emitter.logging=true
|
||||
|
||||
druid.processing.formatString=processing_%s
|
||||
druid.processing.numThreads=1
|
||||
druid.processing.buffer.sizeBytes=10000000
|
||||
|
||||
#emitting, opaque marker
|
||||
druid.service=example
|
||||
|
||||
druid.request.logging.dir=/tmp/example/log
|
||||
druid.realtime.specFile=realtime.spec
|
||||
com.metamx.emitter.logging=true
|
||||
com.metamx.emitter.logging.level=info
|
||||
|
||||
# below are dummy values when operating a realtime only node
|
||||
com.metamx.aws.accessKey=dummy_access_key
|
||||
com.metamx.aws.secretKey=dummy_secret_key
|
||||
druid.pusher.s3.bucket=dummy_s3_bucket
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
druid.server.maxSize=300000000000
|
||||
druid.zk.paths.base=/druid
|
||||
druid.database.segmentTable=prod_segments
|
||||
druid.database.user=druid
|
||||
druid.database.password=diurd
|
||||
druid.database.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.zk.paths.discoveryPath=/druid/discoveryPath
|
||||
druid.database.ruleTable=rules
|
||||
druid.database.configTable=config
|
||||
|
||||
# Path on local FS for storage of segments; dir will be created if needed
|
||||
druid.paths.indexCache=/tmp/druid/indexCache
|
||||
# Path on local FS for storage of segment metadata; dir will be created if needed
|
||||
druid.paths.segmentInfoCache=/tmp/druid/segmentInfoCache
|
||||
druid.pusher.local.storageDirectory=/tmp/druid/localStorage
|
||||
druid.pusher.local=true
|
||||
|
||||
druid.host=127.0.0.1:8080
|
|
@ -1,5 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-examples</artifactId>
|
||||
|
|
|
@ -16,6 +16,7 @@ import com.metamx.druid.realtime.SegmentPublisher;
|
|||
import druid.examples.flights.FlightsFirehoseFactory;
|
||||
import druid.examples.rand.RandomFirehoseFactory;
|
||||
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
import druid.examples.web.WebFirehoseFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -41,7 +42,9 @@ public class RealtimeStandaloneMain
|
|||
rn.registerJacksonSubtype(
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(FlightsFirehoseFactory.class, "flights"),
|
||||
new NamedType(RandomFirehoseFactory.class, "rand")
|
||||
new NamedType(RandomFirehoseFactory.class, "rand"),
|
||||
new NamedType(WebFirehoseFactory.class, "webstream")
|
||||
|
||||
);
|
||||
|
||||
// Create dummy objects for the various interfaces that interact with the DB, ZK and deep storage
|
||||
|
@ -50,7 +53,7 @@ public class RealtimeStandaloneMain
|
|||
rn.setDataSegmentPusher(new NoopDataSegmentPusher());
|
||||
rn.setServerView(new NoopServerView());
|
||||
rn.setInventoryView(new NoopInventoryView());
|
||||
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(
|
||||
new Thread(
|
||||
new Runnable()
|
||||
|
@ -142,5 +145,17 @@ public class RealtimeStandaloneMain
|
|||
{
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class InputSupplierUpdateStream implements UpdateStream
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(InputSupplierUpdateStream.class);
|
||||
private static final long queueWaitTime = 15L;
|
||||
private final TypeReference<HashMap<String, Object>> typeRef;
|
||||
private final InputSupplier<BufferedReader> supplier;
|
||||
private final int QUEUE_SIZE = 10000;
|
||||
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
private final String timeDimension;
|
||||
private final Thread addToQueueThread;
|
||||
|
||||
public InputSupplierUpdateStream(
|
||||
final InputSupplier<BufferedReader> supplier,
|
||||
final String timeDimension
|
||||
)
|
||||
{
|
||||
addToQueueThread = new Thread()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
while (!isInterrupted()) {
|
||||
try {
|
||||
BufferedReader reader = supplier.getInput();
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (isValid(line)) {
|
||||
HashMap<String, Object> map = mapper.readValue(line, typeRef);
|
||||
if (map.get(timeDimension) != null) {
|
||||
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
|
||||
log.debug("Successfully added to queue");
|
||||
} else {
|
||||
log.info("missing timestamp");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
catch (InterruptedException e){
|
||||
log.info(e, "Thread adding events to the queue interrupted");
|
||||
return;
|
||||
}
|
||||
catch (JsonMappingException e) {
|
||||
log.info(e, "Error in converting json to map");
|
||||
}
|
||||
catch (JsonParseException e) {
|
||||
log.info(e, "Error in parsing json");
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.info(e, "Error in connecting to InputStream");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
addToQueueThread.setDaemon(true);
|
||||
|
||||
this.supplier = supplier;
|
||||
this.typeRef = new TypeReference<HashMap<String, Object>>()
|
||||
{
|
||||
};
|
||||
this.timeDimension = timeDimension;
|
||||
}
|
||||
|
||||
private boolean isValid(String s)
|
||||
{
|
||||
return !(s.isEmpty());
|
||||
}
|
||||
|
||||
public void start()
|
||||
{
|
||||
addToQueueThread.start();
|
||||
|
||||
}
|
||||
|
||||
public void stop()
|
||||
{
|
||||
addToQueueThread.interrupt();
|
||||
}
|
||||
|
||||
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return queue.poll(waitTime, unit);
|
||||
}
|
||||
|
||||
public int getQueueSize()
|
||||
{
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
public String getTimeDimension()
|
||||
{
|
||||
return timeDimension;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
|
||||
public class InputSupplierUpdateStreamFactory implements UpdateStreamFactory
|
||||
{
|
||||
private final InputSupplier<BufferedReader> inputSupplier;
|
||||
private final String timeDimension;
|
||||
|
||||
public InputSupplierUpdateStreamFactory(InputSupplier<BufferedReader> inputSupplier, String timeDimension)
|
||||
{
|
||||
this.inputSupplier = inputSupplier;
|
||||
this.timeDimension = timeDimension;
|
||||
}
|
||||
|
||||
public InputSupplierUpdateStream build()
|
||||
{
|
||||
return new InputSupplierUpdateStream(inputSupplier, timeDimension);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RenamingKeysUpdateStream implements UpdateStream
|
||||
{
|
||||
|
||||
private final InputSupplierUpdateStream updateStream;
|
||||
private Map<String, String> renamedDimensions;
|
||||
|
||||
public RenamingKeysUpdateStream(
|
||||
InputSupplierUpdateStream updateStream,
|
||||
Map<String, String> renamedDimensions
|
||||
)
|
||||
{
|
||||
this.renamedDimensions = renamedDimensions;
|
||||
this.updateStream = updateStream;
|
||||
}
|
||||
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return renameKeys(updateStream.pollFromQueue(waitTime, unit));
|
||||
}
|
||||
|
||||
|
||||
private Map<String, Object> renameKeys(Map<String, Object> update)
|
||||
{
|
||||
if (renamedDimensions != null) {
|
||||
Map<String, Object> renamedMap = Maps.newHashMap();
|
||||
for (String key : renamedDimensions.keySet()) {
|
||||
if (update.get(key) != null) {
|
||||
Object obj = update.get(key);
|
||||
renamedMap.put(renamedDimensions.get(key), obj);
|
||||
}
|
||||
}
|
||||
return renamedMap;
|
||||
} else {
|
||||
return update;
|
||||
}
|
||||
}
|
||||
|
||||
public String getTimeDimension()
|
||||
{
|
||||
if (renamedDimensions != null && renamedDimensions.get(updateStream.getTimeDimension()) != null) {
|
||||
return renamedDimensions.get(updateStream.getTimeDimension());
|
||||
}
|
||||
return updateStream.getTimeDimension();
|
||||
|
||||
}
|
||||
|
||||
public void start()
|
||||
{
|
||||
updateStream.start();
|
||||
}
|
||||
|
||||
public void stop(){
|
||||
updateStream.stop();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class RenamingKeysUpdateStreamFactory implements UpdateStreamFactory
|
||||
{
|
||||
private InputSupplierUpdateStreamFactory updateStreamFactory;
|
||||
private Map<String, String> renamedDimensions;
|
||||
|
||||
public RenamingKeysUpdateStreamFactory(InputSupplierUpdateStreamFactory updateStreamFactory, Map<String, String> renamedDimensions)
|
||||
{
|
||||
this.updateStreamFactory = updateStreamFactory;
|
||||
this.renamedDimensions = renamedDimensions;
|
||||
}
|
||||
|
||||
public RenamingKeysUpdateStream build()
|
||||
{
|
||||
return new RenamingKeysUpdateStream(updateStreamFactory.build(), renamedDimensions);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
package druid.examples.web;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface UpdateStream
|
||||
{
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException;
|
||||
public String getTimeDimension();
|
||||
public void start();
|
||||
public void stop();
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
package druid.examples.web;
|
||||
|
||||
public interface UpdateStreamFactory
|
||||
{
|
||||
public UpdateStream build();
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.parsers.TimestampParser;
|
||||
import com.metamx.druid.guava.Runnables;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.input.MapBasedInputRow;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import com.metamx.druid.realtime.firehose.FirehoseFactory;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@JsonTypeName("webstream")
|
||||
public class WebFirehoseFactory implements FirehoseFactory
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WebFirehoseFactory.class);
|
||||
private final String timeFormat;
|
||||
private final UpdateStreamFactory factory;
|
||||
private final long queueWaitTime = 15L;
|
||||
|
||||
@JsonCreator
|
||||
public WebFirehoseFactory(
|
||||
@JsonProperty("url") String url,
|
||||
@JsonProperty("renamedDimensions") Map<String, String> renamedDimensions,
|
||||
@JsonProperty("timeDimension") String timeDimension,
|
||||
@JsonProperty("timeFormat") String timeFormat
|
||||
)
|
||||
{
|
||||
this(
|
||||
new RenamingKeysUpdateStreamFactory(
|
||||
new InputSupplierUpdateStreamFactory(new WebJsonSupplier(url), timeDimension),
|
||||
renamedDimensions
|
||||
), timeFormat
|
||||
);
|
||||
}
|
||||
|
||||
public WebFirehoseFactory(UpdateStreamFactory factory, String timeFormat)
|
||||
{
|
||||
this.factory = factory;
|
||||
if (timeFormat == null) {
|
||||
this.timeFormat = "auto";
|
||||
} else {
|
||||
this.timeFormat = timeFormat;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect() throws IOException
|
||||
{
|
||||
|
||||
final UpdateStream updateStream = factory.build();
|
||||
updateStream.start();
|
||||
|
||||
return new Firehose()
|
||||
{
|
||||
Map<String, Object> map;
|
||||
private final Runnable doNothingRunnable = Runnables.getNoopRunnable();
|
||||
|
||||
@Override
|
||||
public boolean hasMore()
|
||||
{
|
||||
try {
|
||||
map = updateStream.pollFromQueue(queueWaitTime, TimeUnit.SECONDS);
|
||||
return map != null;
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InputRow nextRow()
|
||||
{
|
||||
try {
|
||||
DateTime date = TimestampParser.createTimestampParser(timeFormat)
|
||||
.apply(map.get(updateStream.getTimeDimension()).toString());
|
||||
return new MapBasedInputRow(
|
||||
date.getMillis(),
|
||||
new ArrayList(map.keySet()),
|
||||
map
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
finally {
|
||||
map = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
// ephemera in, ephemera out.
|
||||
return doNothingRunnable; // reuse the same object each time
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
updateStream.stop();
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
||||
public class WebJsonSupplier implements InputSupplier<BufferedReader>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WebJsonSupplier.class);
|
||||
|
||||
private String urlString;
|
||||
private URL url;
|
||||
|
||||
public WebJsonSupplier(String urlString)
|
||||
{
|
||||
this.urlString = urlString;
|
||||
try {
|
||||
this.url = new URL(urlString);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e,"Malformed url");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedReader getInput() throws IOException
|
||||
{
|
||||
URL url = new URL(urlString);
|
||||
URLConnection connection = url.openConnection();
|
||||
connection.setDoInput(true);
|
||||
return new BufferedReader(new InputStreamReader(url.openStream()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class InputSupplierUpdateStreamTest
|
||||
{
|
||||
private final long waitTime = 1L;
|
||||
private final TimeUnit unit = TimeUnit.SECONDS;
|
||||
private final ArrayList<String> dimensions = new ArrayList<String>();
|
||||
private InputSupplier testCaseSupplier;
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
String timeDimension;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
timeDimension = "time";
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"item2\":2,"
|
||||
+ "\"time\":1372121562 }"
|
||||
);
|
||||
|
||||
dimensions.add("item1");
|
||||
dimensions.add("item2");
|
||||
dimensions.add("time");
|
||||
|
||||
expectedAnswer.put("item1", "value1");
|
||||
expectedAnswer.put("item2", 2);
|
||||
expectedAnswer.put("time", 1372121562);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void basicIngestionCheck() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
|
||||
testCaseSupplier,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.start();
|
||||
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
|
||||
Assert.assertEquals(expectedAnswer, insertedRow);
|
||||
}
|
||||
|
||||
//If a timestamp is missing, we should throw away the event
|
||||
@Test
|
||||
public void missingTimeStampCheck()
|
||||
{
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"item2\":2}"
|
||||
);
|
||||
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
|
||||
testCaseSupplier,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.start();
|
||||
Assert.assertEquals(updateStream.getQueueSize(), 0);
|
||||
}
|
||||
|
||||
//If any other value is missing, we should still add the event and process it properly
|
||||
@Test
|
||||
public void otherNullValueCheck() throws Exception
|
||||
{
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"time\":1372121562 }"
|
||||
);
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(
|
||||
testCaseSupplier,
|
||||
timeDimension
|
||||
);
|
||||
updateStream.start();
|
||||
Map<String, Object> insertedRow = updateStream.pollFromQueue(waitTime, unit);
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
expectedAnswer.put("item1", "value1");
|
||||
expectedAnswer.put("time", 1372121562);
|
||||
Assert.assertEquals(expectedAnswer, insertedRow);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RenamingKeysUpdateStreamTest
|
||||
{
|
||||
private final long waitTime = 15L;
|
||||
private final TimeUnit unit = TimeUnit.SECONDS;
|
||||
private InputSupplier testCaseSupplier;
|
||||
String timeDimension;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
timeDimension = "time";
|
||||
testCaseSupplier = new TestCaseSupplier(
|
||||
"{\"item1\": \"value1\","
|
||||
+ "\"item2\":2,"
|
||||
+ "\"time\":1372121562 }"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPollFromQueue() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
|
||||
Map<String, String> renamedKeys = new HashMap<String, String>();
|
||||
renamedKeys.put("item1", "i1");
|
||||
renamedKeys.put("item2", "i2");
|
||||
renamedKeys.put("time", "t");
|
||||
|
||||
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
|
||||
renamer.start();
|
||||
Map<String, Object> expectedAnswer = new HashMap<String, Object>();
|
||||
expectedAnswer.put("i1", "value1");
|
||||
expectedAnswer.put("i2", 2);
|
||||
expectedAnswer.put("t", 1372121562);
|
||||
|
||||
|
||||
Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTimeDimension() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
|
||||
Map<String, String> renamedKeys = new HashMap<String, String>();
|
||||
renamedKeys.put("item1", "i1");
|
||||
renamedKeys.put("item2", "i2");
|
||||
renamedKeys.put("time", "t");
|
||||
|
||||
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
|
||||
Assert.assertEquals("t", renamer.getTimeDimension());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingTimeRename() throws Exception
|
||||
{
|
||||
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
|
||||
Map<String, String> renamedKeys = new HashMap<String, String>();
|
||||
renamedKeys.put("item1", "i1");
|
||||
renamedKeys.put("item2", "i2");
|
||||
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
|
||||
Assert.assertEquals("time", renamer.getTimeDimension());
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.io.InputSupplier;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
|
||||
public class TestCaseSupplier implements InputSupplier<BufferedReader>
|
||||
{
|
||||
private final String testString;
|
||||
|
||||
public TestCaseSupplier(String testString)
|
||||
{
|
||||
this.testString = testString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedReader getInput() throws IOException
|
||||
{
|
||||
return new BufferedReader(new StringReader(testString));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,223 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.realtime.firehose.Firehose;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class WebFirehoseFactoryTest
|
||||
{
|
||||
private List<String> dimensions = Lists.newArrayList();
|
||||
private WebFirehoseFactory webbie;
|
||||
private WebFirehoseFactory webbie1;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
dimensions.add("item1");
|
||||
dimensions.add("item2");
|
||||
dimensions.add("time");
|
||||
webbie = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1372121562"));
|
||||
}
|
||||
},
|
||||
"posix"
|
||||
);
|
||||
|
||||
webbie1 = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "1373241600000"));
|
||||
}
|
||||
},
|
||||
"auto"
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDimensions() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie.connect();
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
List<String> actualAnswer = inputRow.getDimensions();
|
||||
Collections.sort(actualAnswer);
|
||||
Assert.assertEquals(actualAnswer, dimensions);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPosixTimeStamp() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie.connect();
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
long expectedTime = 1372121562L * 1000L;
|
||||
Assert.assertEquals(expectedTime, inputRow.getTimestampFromEpoch());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testISOTimeStamp() throws Exception
|
||||
{
|
||||
WebFirehoseFactory webbie3 = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
|
||||
}
|
||||
},
|
||||
"auto"
|
||||
);
|
||||
Firehose firehose1 = webbie3.connect();
|
||||
if (firehose1.hasMore()) {
|
||||
long milliSeconds = firehose1.nextRow().getTimestampFromEpoch();
|
||||
DateTime date = new DateTime("2013-07-08");
|
||||
Assert.assertEquals(date.getMillis(), milliSeconds);
|
||||
} else {
|
||||
Assert.assertFalse("hasMore returned false", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoIsoTimeStamp() throws Exception
|
||||
{
|
||||
WebFirehoseFactory webbie2 = new WebFirehoseFactory(
|
||||
new UpdateStreamFactory()
|
||||
{
|
||||
@Override
|
||||
public UpdateStream build()
|
||||
{
|
||||
return new MyUpdateStream(ImmutableMap.<String,Object>of("item1", "value1", "item2", 2, "time", "2013-07-08"));
|
||||
}
|
||||
},
|
||||
null
|
||||
);
|
||||
Firehose firehose2 = webbie2.connect();
|
||||
if (firehose2.hasMore()) {
|
||||
long milliSeconds = firehose2.nextRow().getTimestampFromEpoch();
|
||||
DateTime date = new DateTime("2013-07-08");
|
||||
Assert.assertEquals(date.getMillis(), milliSeconds);
|
||||
} else {
|
||||
Assert.assertFalse("hasMore returned false", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoMilliSecondsTimeStamp() throws Exception
|
||||
{
|
||||
Firehose firehose3 = webbie1.connect();
|
||||
if (firehose3.hasMore()) {
|
||||
long milliSeconds = firehose3.nextRow().getTimestampFromEpoch();
|
||||
DateTime date = new DateTime("2013-07-08");
|
||||
Assert.assertEquals(date.getMillis(), milliSeconds);
|
||||
} else {
|
||||
Assert.assertFalse("hasMore returned false", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDimension() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie1.connect();
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
|
||||
List<String> column1 = Lists.newArrayList();
|
||||
column1.add("value1");
|
||||
Assert.assertEquals(column1, inputRow.getDimension("item1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFloatMetric() throws Exception
|
||||
{
|
||||
InputRow inputRow;
|
||||
Firehose firehose = webbie1.connect();
|
||||
if (firehose.hasMore()) {
|
||||
inputRow = firehose.nextRow();
|
||||
} else {
|
||||
throw new RuntimeException("queue is empty");
|
||||
}
|
||||
|
||||
Assert.assertEquals((float) 2.0, inputRow.getFloatMetric("item2"), 0.0f);
|
||||
}
|
||||
|
||||
private static class MyUpdateStream implements UpdateStream
|
||||
{
|
||||
private static ImmutableMap<String,Object> map;
|
||||
public MyUpdateStream(ImmutableMap<String,Object> map){
|
||||
this.map=map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> pollFromQueue(long waitTime, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTimeDimension()
|
||||
{
|
||||
return "time";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package druid.examples.web;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
public class WebJsonSupplierTest
|
||||
{
|
||||
@Test(expected = UnknownHostException.class)
|
||||
public void checkInvalidUrl() throws Exception
|
||||
{
|
||||
|
||||
String invalidURL = "http://invalid.url";
|
||||
WebJsonSupplier supplier = new WebJsonSupplier(invalidURL);
|
||||
supplier.getInput();
|
||||
}
|
||||
}
|
|
@ -30,7 +30,6 @@ import com.metamx.druid.Query;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
||||
import com.metamx.druid.index.v1.IndexGranularity;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.indexing.common.TaskLock;
|
||||
import com.metamx.druid.indexing.common.TaskStatus;
|
||||
import com.metamx.druid.indexing.common.TaskToolbox;
|
||||
|
@ -38,6 +37,7 @@ import com.metamx.druid.indexing.common.actions.LockAcquireAction;
|
|||
import com.metamx.druid.indexing.common.actions.LockListAction;
|
||||
import com.metamx.druid.indexing.common.actions.LockReleaseAction;
|
||||
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import com.metamx.druid.query.FinalizeResultsQueryRunner;
|
||||
import com.metamx.druid.query.QueryRunner;
|
||||
import com.metamx.druid.query.QueryRunnerFactory;
|
||||
|
@ -226,6 +226,28 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
for (DataSegment segment : segments) {
|
||||
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
|
||||
}
|
||||
toolbox.getSegmentAnnouncer().announceSegments(segments);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||
{
|
||||
try {
|
||||
toolbox.getSegmentAnnouncer().unannounceSegments(segments);
|
||||
}
|
||||
finally {
|
||||
for (DataSegment segment : segments) {
|
||||
toolbox.getTaskActionClient().submit(new LockReleaseAction(segment.getInterval()));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
|
||||
|
|
|
@ -106,9 +106,7 @@ public class TaskMasterLifecycle
|
|||
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
|
||||
leaderLifecycle.addManagedInstance(taskConsumer);
|
||||
|
||||
if (indexerCoordinatorConfig.isAutoScalingEnabled()) {
|
||||
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
|
||||
}
|
||||
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
|
||||
|
||||
try {
|
||||
leaderLifecycle.start();
|
||||
|
|
|
@ -55,10 +55,6 @@ import com.metamx.druid.http.GuiceServletConfig;
|
|||
import com.metamx.druid.http.RedirectFilter;
|
||||
import com.metamx.druid.http.RedirectInfo;
|
||||
import com.metamx.druid.http.StatusServlet;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.indexing.common.RetryPolicyFactory;
|
||||
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
|
@ -93,12 +89,17 @@ import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
|
|||
import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementScheduler;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
|
||||
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig;
|
||||
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.core.Emitters;
|
||||
|
@ -687,53 +688,64 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
|
|||
private void initializeResourceManagement(final JacksonConfigManager configManager)
|
||||
{
|
||||
if (resourceManagementSchedulerFactory == null) {
|
||||
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
|
||||
{
|
||||
@Override
|
||||
public ResourceManagementScheduler build(TaskRunner runner)
|
||||
if (!config.isAutoScalingEnabled()) {
|
||||
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
|
||||
{
|
||||
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("ScalingExec--%d")
|
||||
.build()
|
||||
);
|
||||
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
|
||||
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
|
||||
);
|
||||
|
||||
AutoScalingStrategy strategy;
|
||||
if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) {
|
||||
strategy = new EC2AutoScalingStrategy(
|
||||
getJsonMapper(),
|
||||
new AmazonEC2Client(
|
||||
new BasicAWSCredentials(
|
||||
PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey")
|
||||
)
|
||||
),
|
||||
getConfigFactory().build(EC2AutoScalingStrategyConfig.class),
|
||||
workerSetupData
|
||||
);
|
||||
} else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) {
|
||||
strategy = new NoopAutoScalingStrategy();
|
||||
} else {
|
||||
throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl());
|
||||
@Override
|
||||
public ResourceManagementScheduler build(TaskRunner runner)
|
||||
{
|
||||
return new NoopResourceManagementScheduler();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
|
||||
{
|
||||
@Override
|
||||
public ResourceManagementScheduler build(TaskRunner runner)
|
||||
{
|
||||
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("ScalingExec--%d")
|
||||
.build()
|
||||
);
|
||||
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
|
||||
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
|
||||
);
|
||||
|
||||
return new ResourceManagementScheduler(
|
||||
runner,
|
||||
new SimpleResourceManagementStrategy(
|
||||
strategy,
|
||||
getConfigFactory().build(SimpleResourceManagmentConfig.class),
|
||||
AutoScalingStrategy strategy;
|
||||
if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) {
|
||||
strategy = new EC2AutoScalingStrategy(
|
||||
getJsonMapper(),
|
||||
new AmazonEC2Client(
|
||||
new BasicAWSCredentials(
|
||||
PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey")
|
||||
)
|
||||
),
|
||||
getConfigFactory().build(EC2AutoScalingStrategyConfig.class),
|
||||
workerSetupData
|
||||
),
|
||||
getConfigFactory().build(ResourceManagementSchedulerConfig.class),
|
||||
scalingScheduledExec
|
||||
);
|
||||
}
|
||||
};
|
||||
);
|
||||
} else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) {
|
||||
strategy = new NoopAutoScalingStrategy();
|
||||
} else {
|
||||
throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl());
|
||||
}
|
||||
|
||||
return new ResourceManagementScheduler(
|
||||
runner,
|
||||
new SimpleResourceManagementStrategy(
|
||||
strategy,
|
||||
getConfigFactory().build(SimpleResourceManagmentConfig.class),
|
||||
workerSetupData
|
||||
),
|
||||
getConfigFactory().build(ResourceManagementSchedulerConfig.class),
|
||||
scalingScheduledExec
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.indexing.coordinator.scaling;
|
||||
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NoopResourceManagementScheduler extends ResourceManagementScheduler
|
||||
{
|
||||
private static final Logger log = new Logger(NoopResourceManagementScheduler.class);
|
||||
|
||||
public NoopResourceManagementScheduler()
|
||||
{
|
||||
super(null, null, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
log.info("Autoscaling is disabled.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScalingStats getStats()
|
||||
{
|
||||
return new ScalingStats(0);
|
||||
}
|
||||
}
|
4
pom.xml
4
pom.xml
|
@ -39,7 +39,7 @@
|
|||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.22.3</metamx.java-util.version>
|
||||
<apache.curator.version>2.0.2-21-22</apache.curator.version>
|
||||
<apache.curator.version>2.1.0-incubating</apache.curator.version>
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
|
@ -65,7 +65,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>http-client</artifactId>
|
||||
<version>0.7.1</version>
|
||||
<version>0.8.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.coordination;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.curator.utils.ZKPaths;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -103,7 +105,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
|
||||
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
|
||||
|
||||
loadCache();
|
||||
if (config.isLoadFromSegmentCacheEnabled()) {
|
||||
loadCache();
|
||||
}
|
||||
|
||||
loadQueueCache.getListenable().addListener(
|
||||
new PathChildrenCacheListener()
|
||||
|
@ -136,9 +140,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
}
|
||||
|
||||
log.makeAlert(e, "Segment load/unload: uncaught exception.")
|
||||
.addData("node", path)
|
||||
.addData("nodeProperties", segment)
|
||||
.emit();
|
||||
.addData("node", path)
|
||||
.addData("nodeProperties", segment)
|
||||
.emit();
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -192,12 +196,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
return;
|
||||
}
|
||||
|
||||
List<DataSegment> cachedSegments = Lists.newArrayList();
|
||||
for (File file : baseDir.listFiles()) {
|
||||
log.info("Loading segment cache file [%s]", file);
|
||||
try {
|
||||
DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
|
||||
if (serverManager.isSegmentCached(segment)) {
|
||||
addSegment(segment);
|
||||
cachedSegments.add(segment);
|
||||
//addSegment(segment);
|
||||
} else {
|
||||
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
|
||||
|
||||
|
@ -213,6 +219,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
addSegments(cachedSegments);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -222,14 +230,16 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
serverManager.loadSegment(segment);
|
||||
|
||||
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
removeSegment(segment);
|
||||
throw new SegmentLoadingException(
|
||||
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
|
||||
);
|
||||
if (!segmentInfoCacheFile.exists()) {
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
removeSegment(segment);
|
||||
throw new SegmentLoadingException(
|
||||
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -239,14 +249,51 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
removeSegment(segment);
|
||||
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
|
||||
}
|
||||
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
log.makeAlert(e, "Failed to load segment for dataSource")
|
||||
.addData("segment", segment)
|
||||
.emit();
|
||||
.addData("segment", segment)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
public void addSegments(Iterable<DataSegment> segments)
|
||||
{
|
||||
try {
|
||||
for (DataSegment segment : segments) {
|
||||
serverManager.loadSegment(segment);
|
||||
|
||||
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.exists()) {
|
||||
try {
|
||||
jsonMapper.writeValue(segmentInfoCacheFile, segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
removeSegment(segment);
|
||||
throw new SegmentLoadingException(
|
||||
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
announcer.announceSegments(segments);
|
||||
}
|
||||
catch (IOException e) {
|
||||
removeSegments(segments);
|
||||
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
|
||||
}
|
||||
}
|
||||
catch (SegmentLoadingException e) {
|
||||
log.makeAlert(e, "Failed to load segments for dataSource")
|
||||
.addData("segments", segments)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void removeSegment(DataSegment segment)
|
||||
{
|
||||
|
@ -262,8 +309,29 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to remove segment")
|
||||
.addData("segment", segment)
|
||||
.emit();
|
||||
.addData("segment", segment)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
|
||||
public void removeSegments(Iterable<DataSegment> segments)
|
||||
{
|
||||
try {
|
||||
for (DataSegment segment : segments) {
|
||||
serverManager.dropSegment(segment);
|
||||
|
||||
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
|
||||
if (!segmentInfoCacheFile.delete()) {
|
||||
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
|
||||
}
|
||||
}
|
||||
|
||||
announcer.unannounceSegments(segments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to remove segments")
|
||||
.addData("segments", segments)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,4 +29,10 @@ public abstract class ZkCoordinatorConfig
|
|||
{
|
||||
@Config("druid.paths.segmentInfoCache")
|
||||
public abstract File getSegmentInfoCacheDirectory();
|
||||
|
||||
@Config("druid.segmentCache.enable")
|
||||
public boolean isLoadFromSegmentCacheEnabled()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -443,17 +443,17 @@ public class DruidMaster
|
|||
curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost()
|
||||
);
|
||||
|
||||
newLeaderLatch.attachListener(
|
||||
newLeaderLatch.addListener(
|
||||
new LeaderLatchListener()
|
||||
{
|
||||
@Override
|
||||
public void becomeMaster()
|
||||
public void isLeader()
|
||||
{
|
||||
DruidMaster.this.becomeMaster();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopBeingMaster()
|
||||
public void notLeader()
|
||||
{
|
||||
DruidMaster.this.stopBeingMaster();
|
||||
}
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
|
||||
<?xml version="1.0"?>
|
||||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
|
||||
<id>bin</id>
|
||||
<formats>
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
<formats>
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<directory>../examples/config</directory>
|
||||
|
@ -13,6 +12,34 @@
|
|||
</includes>
|
||||
<outputDirectory>config</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/config/broker</directory>
|
||||
<includes>
|
||||
<include>*</include>
|
||||
</includes>
|
||||
<outputDirectory>config/broker</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/config/master</directory>
|
||||
<includes>
|
||||
<include>*</include>
|
||||
</includes>
|
||||
<outputDirectory>config/master</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/config/realtime</directory>
|
||||
<includes>
|
||||
<include>*</include>
|
||||
</includes>
|
||||
<outputDirectory>config/realtime</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/config/compute</directory>
|
||||
<includes>
|
||||
<include>*</include>
|
||||
</includes>
|
||||
<outputDirectory>config/compute</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/bin</directory>
|
||||
<includes>
|
||||
|
@ -28,14 +55,21 @@
|
|||
</includes>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<fileSet>
|
||||
<directory>../services/target</directory>
|
||||
<includes>
|
||||
<include>druid-services-*-selfcontained.jar</include>
|
||||
</includes>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/bin/examples</directory>
|
||||
<includes>
|
||||
<include>**</include>
|
||||
</includes>
|
||||
<outputDirectory>examples</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<fileSet>
|
||||
<directory>../examples/bin/examples/twitter</directory>
|
||||
<includes>
|
||||
<include>*sh</include>
|
||||
|
|
Loading…
Reference in New Issue