queries working at this point, tests pass, still need more tests and fix rules interface

This commit is contained in:
fjy 2014-01-07 16:24:53 -08:00
parent e11952f3b6
commit f44509b530
26 changed files with 416 additions and 282 deletions

View File

@ -61,7 +61,8 @@ public class DruidServer implements Comparable
node.getHost(),
config.getMaxSize(),
type,
config.getTier()
config.getTier(),
0
);
}
@ -71,10 +72,11 @@ public class DruidServer implements Comparable
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("tier") String tier
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
)
{
this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier);
this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier, priority);
this.dataSources = new ConcurrentHashMap<String, DruidDataSource>();
this.segments = new ConcurrentHashMap<String, DataSegment>();
@ -120,6 +122,12 @@ public class DruidServer implements Comparable
return metadata.getTier();
}
@JsonProperty
public int getPriority()
{
return metadata.getPriority();
}
@JsonProperty
public Map<String, DataSegment> getSegments()
{

View File

@ -34,6 +34,9 @@ public class DruidServerConfig
@JsonProperty
private String tier = DruidServer.DEFAULT_TIER;
@JsonProperty
private int priority = 0;
public long getMaxSize()
{
return maxSize;
@ -43,4 +46,9 @@ public class DruidServerConfig
{
return tier;
}
public int getPriority()
{
return priority;
}
}

View File

@ -36,7 +36,12 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
@Override
public int compare(QueryableDruidServer left, QueryableDruidServer right)
{
return Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
int retVal = -Ints.compare(left.getServer().getPriority(), right.getServer().getPriority());
if (retVal == 0) {
retVal = Ints.compare(left.getClient().getNumOpenConnections(), right.getClient().getNumOpenConnections());
}
return retVal;
}
};
@ -84,9 +89,12 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
synchronized (this) {
final int size = servers.size();
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return Collections.min(servers, comparator);
case 0:
return null;
case 1:
return servers.iterator().next();
default:
return Collections.min(servers, comparator);
}
}
}

View File

@ -150,8 +150,6 @@ public class Announcer
}
}
log.info("Announcing with Curator %s", curator); // TODO
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();

View File

@ -67,7 +67,8 @@ public class StorageNodeModule implements Module
node.getHost(),
config.getMaxSize(),
nodeType.getNodeType(),
config.getTier()
config.getTier(),
config.getPriority()
);
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.google.inject.BindingAnnotation;

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import io.druid.curator.CuratorConfig;

View File

@ -1,38 +0,0 @@
package io.druid.server.bridge;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class BridgeJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class);
root.addServlet(holderPwd, "/");
//root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GzipFilter.class, "/*", null);
// Can't use '/*' here because of Guice and Jetty static content conflicts
// The coordinator really needs a standarized api path
root.addFilter(GuiceFilter.class, "/status/*", null);
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root});
server.setHandler(handlerList);
}
}

View File

@ -0,0 +1,121 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.net.URL;
import java.util.List;
/**
*/
public class BridgeQuerySegmentWalker implements QuerySegmentWalker
{
private final ServerDiscoverySelector brokerSelector;
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
private final StatusResponseHandler responseHandler;
@Inject
public BridgeQuerySegmentWalker(
ServerDiscoverySelector brokerSelector,
@Global HttpClient httpClient,
ObjectMapper jsonMapper
)
{
this.brokerSelector = brokerSelector;
this.httpClient = httpClient;
this.jsonMapper = jsonMapper;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(
Query<T> query, Iterable<Interval> intervals
)
{
return makeRunner();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(
Query<T> query, Iterable<SegmentDescriptor> specs
)
{
return makeRunner();
}
private <T> QueryRunner<T> makeRunner()
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query)
{
try {
Server instance = brokerSelector.pick();
if (instance == null) {
return Sequences.empty();
}
final String url = String.format(
"http://%s/druid/v2/",
brokerSelector.pick().getHost()
);
StatusResponseHolder response = httpClient.post(new URL(url))
.setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
)
.go(responseHandler)
.get();
List<T> results = jsonMapper.readValue(
response.getContent(), new TypeReference<List<T>>()
{
}
);
return Sequences.simple(results);
}
catch (Exception e) {
return Sequences.empty();
}
}
};
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -8,7 +27,6 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.server.coordination.BaseZkCoordinator;
import io.druid.server.coordination.DataSegmentChangeCallback;
@ -51,13 +69,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
}
@Override
public void createCacheDir()
{
// do nothing
}
@Override
public void loadCache()
public void loadLocalCache()
{
// do nothing
}
@ -85,7 +97,6 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
{
if (theSegment.equals(segment)) {
callback.execute();
log.info("Callback executed");
}
return ServerView.CallbackAction.CONTINUE;
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -41,21 +60,25 @@ import java.util.concurrent.atomic.AtomicReference;
@ManageLifecycle
public class DruidClusterBridge
{
public static final String CONNECTOR_OWNER_NODE = "_CONNECTOR";
public static final String BRIDGE_OWNER_NODE = "_BRIDGE";
public static final String NODE_TYPE = "bridge";
private static final EmittingLogger log = new EmittingLogger(DruidClusterBridge.class);
private final ObjectMapper jsonMapper;
private final DruidClusterBridgeConfig config;
private final BridgeZkCoordinator bridgeZkCoordinator; // watches for assignments from main cluster
private final Announcer announcer; //announce self to main cluster
private final ServerInventoryView<Object> serverInventoryView;
private final CuratorFramework curator;
private final ScheduledExecutorService exec;
private final AtomicReference<LeaderLatch> leaderLatch;
private final DruidNode self;
// Communicates to the ZK cluster that this bridge node is deployed at
private final CuratorFramework curator;
private final AtomicReference<LeaderLatch> leaderLatch;
// Communicates to the remote (parent) ZK cluster
private final BridgeZkCoordinator bridgeZkCoordinator;
private final Announcer announcer;
private final ServerInventoryView<Object> serverInventoryView;
private final Map<DataSegment, Integer> segments = Maps.newHashMap();
private final Object lock = new Object();
@ -66,14 +89,14 @@ public class DruidClusterBridge
public DruidClusterBridge(
ObjectMapper jsonMapper,
DruidClusterBridgeConfig config,
ScheduledExecutorFactory scheduledExecutorFactory,
@Self DruidNode self,
CuratorFramework curator,
AtomicReference<LeaderLatch> leaderLatch,
BridgeZkCoordinator bridgeZkCoordinator,
@Bridge Announcer announcer,
@Bridge final AbstractDataSegmentAnnouncer dataSegmentAnnouncer,
ServerInventoryView serverInventoryView,
CuratorFramework curator,
ScheduledExecutorFactory scheduledExecutorFactory,
AtomicReference<LeaderLatch> leaderLatch,
@Self DruidNode self
ServerInventoryView serverInventoryView
)
{
this.jsonMapper = jsonMapper;
@ -87,8 +110,6 @@ public class DruidClusterBridge
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.self = self;
log.info("Local curator: [%s]", curator); // TODO
serverInventoryView.registerSegmentCallback(
Executors.newFixedThreadPool(
1,
@ -128,7 +149,7 @@ public class DruidClusterBridge
synchronized (lock) {
Integer count = segments.get(segment);
if (count != null) {
if (count == 0) {
if (count == 1) {
dataSegmentAnnouncer.unannounceSegment(segment);
segments.remove(segment);
} else {
@ -174,7 +195,7 @@ public class DruidClusterBridge
private LeaderLatch createNewLeaderLatch()
{
final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, ZKPaths.makePath(config.getConnectorPath(), CONNECTOR_OWNER_NODE), self.getHost()
curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHost()
);
newLeaderLatch.addListener(
@ -266,13 +287,13 @@ public class DruidClusterBridge
if (totalMaxSize == 0) {
log.warn("No servers founds!");
} else {
DruidServerMetadata me = new DruidServerMetadata(
self.getHost(),
self.getHost(),
totalMaxSize,
NODE_TYPE,
config.getTier()
config.getTier(),
config.getPriority()
);
try {

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import io.druid.client.DruidServer;
@ -8,7 +27,6 @@ import org.skife.config.Default;
/**
*/
// TODO: make sure that this uses sub cluster zk paths
public abstract class DruidClusterBridgeConfig extends ZkPathsConfig
{
@Config("druid.server.tier")
@ -22,4 +40,11 @@ public abstract class DruidClusterBridgeConfig extends ZkPathsConfig
@Config("druid.bridge.period")
@Default("PT60s")
public abstract Duration getPeriod();
@Config("druid.bridge.broker.serviceName")
public abstract String getBrokerServiceName();
@Config("druid.server.priority")
@Default("0")
public abstract int getPriority();
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -48,12 +67,13 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
@LifecycleStart
public void start() throws IOException
{
log.info("Starting zkCoordinator for server[%s]", me);
synchronized (lock) {
if (started) {
return;
}
log.info("Starting zkCoordinator for server[%s]", me);
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
@ -67,14 +87,11 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
);
try {
createCacheDir();
log.info("Remote Curator[%s]", curator); // TODO
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadCache();
loadLocalCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
@ -173,9 +190,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
}
}
public abstract void createCacheDir();
public abstract void loadCache();
public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
}

View File

@ -31,6 +31,7 @@ public class DruidServerMetadata
private final long maxSize;
private final String tier;
private final String type;
private final int priority;
@JsonCreator
public DruidServerMetadata(
@ -38,7 +39,8 @@ public class DruidServerMetadata
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("tier") String tier
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
)
{
this.name = name;
@ -46,6 +48,7 @@ public class DruidServerMetadata
this.maxSize = maxSize;
this.tier = tier;
this.type = type;
this.priority = priority;
}
@JsonProperty
@ -78,6 +81,12 @@ public class DruidServerMetadata
return type;
}
@JsonProperty
public int getPriority()
{
return priority;
}
@Override
public String toString()
{
@ -87,6 +96,7 @@ public class DruidServerMetadata
", maxSize=" + maxSize +
", tier='" + tier + '\'' +
", type='" + type + '\'' +
", priority='" + priority + '\'' +
'}';
}
}

View File

@ -64,16 +64,11 @@ public class ZkCoordinator extends BaseZkCoordinator
}
@Override
public void createCacheDir()
{
config.getInfoDir().mkdirs();
}
@Override
public void loadCache()
public void loadLocalCache()
{
final long start = System.currentTimeMillis();
File baseDir = config.getInfoDir();
if (!baseDir.exists()) {
if (!baseDir.exists() && !config.getInfoDir().mkdirs()) {
return;
}
@ -107,7 +102,7 @@ public class ZkCoordinator extends BaseZkCoordinator
@Override
public void execute()
{
// do nothing
log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
}
}
);

View File

@ -35,11 +35,8 @@ import org.joda.time.DateTime;
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
@JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class),
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
@JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class),
@JsonSubTypes.Type(name = "dropBySize", value = SizeDropRule.class)
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class)
})
public interface Rule
{
public String getType();

View File

@ -1,71 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Range;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
/**
*/
public class SizeDropRule extends DropRule
{
private final long low;
private final long high;
private final Range<Long> range;
@JsonCreator
public SizeDropRule(
@JsonProperty("low") long low,
@JsonProperty("high") long high
)
{
this.low = low;
this.high = high;
this.range = Range.closedOpen(low, high);
}
@Override
@JsonProperty
public String getType()
{
return "dropBySize";
}
@JsonProperty
public long getLow()
{
return low;
}
@JsonProperty
public long getHigh()
{
return high;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return range.contains(segment.getSize());
}
}

View File

@ -1,78 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Range;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.Map;
/**
*/
public class SizeLoadRule extends LoadRule
{
private final long low;
private final long high;
private final Integer replicants;
private final String tier;
private final Range<Long> range;
@JsonCreator
public SizeLoadRule(
@JsonProperty("low") long low,
@JsonProperty("high") long high,
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.low = low;
this.high = high;
this.replicants = replicants;
this.tier = tier;
this.range = Range.closedOpen(low, high);
}
@Override
public Map<String, Integer> getTieredReplicants()
{
return null;
}
@Override
public int getNumReplicants(String tier)
{
return 0;
}
@Override
public String getType()
{
return "loadBySize";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return range.contains(segment.getSize());
}
}

View File

@ -6,6 +6,8 @@ var ruleTypes = [
"loadByPeriod",
"dropByInterval",
"dropByPeriod",
"loadForever",
"dropForever",
"JSON"
];
@ -60,6 +62,12 @@ function makeRuleBody(rule) {
case "dropByPeriod":
retVal += makeDropByPeriod(rule);
break;
case "loadForever":
retVal += makeLoadForever(rule);
break;
case "dropForever":
retVal += "";
break;
case "JSON":
retVal += makeJSON();
break;
@ -93,6 +101,12 @@ function makeDropByPeriod(rule) {
return "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
}
function makeLoadForever(rule) {
return "<span class='rule_label'>replicants</span><input type='text' class='short_text' name='replicants' " + "value='" + rule.replicants + "'/>" +
makeTiersDropdown(rule)
;
}
function makeJSON() {
return "<span class='rule_label'>JSON</span><input type='text' class='very_long_text' name='JSON'/>";
}

View File

@ -93,7 +93,8 @@ public class BatchServerInventoryViewTest
"host",
Long.MAX_VALUE,
"type",
"tier"
"tier",
0
),
new BatchDataSegmentAnnouncerConfig()
{

View File

@ -27,6 +27,7 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import io.druid.client.DirectDruidClient;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.ReflectionQueryToolChestWarehouse;
@ -91,12 +92,12 @@ public class ServerSelectorTest
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
null,
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
null,
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client2
);
serverSelector.addServer(queryableDruidServer2);

View File

@ -83,7 +83,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
MoreExecutors.sameThreadExecutor()
);
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal");
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
final ZkPathsConfig zkPaths = new ZkPathsConfig()
{

View File

@ -93,7 +93,8 @@ public class BatchDataSegmentAnnouncerTest
"host",
Long.MAX_VALUE,
"type",
"tier"
"tier",
0
),
new BatchDataSegmentAnnouncerConfig()
{

View File

@ -133,7 +133,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -148,7 +149,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
),
mockPeon
)
@ -163,7 +165,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostCold",
1000,
"historical",
"cold"
"cold",
0
),
mockPeon
)
@ -228,7 +231,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
),
@ -238,7 +242,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot2",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -253,7 +258,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostCold",
1000,
"historical",
"cold"
"cold",
0
),
mockPeon
)
@ -311,7 +317,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment);
@ -328,7 +335,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -400,7 +408,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
),
mockPeon
)
@ -450,7 +459,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
),
mockPeon
)
@ -499,7 +509,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server.addDataSegment(segment.getIdentifier(), segment);
@ -560,7 +571,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
@ -569,7 +581,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
@ -637,7 +650,8 @@ public class DruidCoordinatorRuleRunnerTest
"host1",
1000,
"historical",
"hot"
"hot",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
@ -645,7 +659,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
@ -716,14 +731,16 @@ public class DruidCoordinatorRuleRunnerTest
"host1",
1000,
"historical",
"hot"
"hot",
0
);
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
@ -786,7 +803,8 @@ public class DruidCoordinatorRuleRunnerTest
"host1",
1000,
"historical",
"normal"
"normal",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
@ -794,7 +812,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
DruidServer server3 = new DruidServer(
@ -802,7 +821,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm3",
1000,
"historical",
"normal"
"normal",
0
);
server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
@ -893,7 +913,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
),
@ -903,7 +924,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot2",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -992,7 +1014,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm1",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server1.addDataSegment(availableSegment.getIdentifier(), availableSegment);
@ -1002,7 +1025,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server2.addDataSegment(availableSegment.getIdentifier(), availableSegment);

View File

@ -135,7 +135,8 @@ public class LoadRuleTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -150,7 +151,8 @@ public class LoadRuleTest
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER
DruidServer.DEFAULT_TIER,
0
),
mockPeon
)
@ -219,7 +221,8 @@ public class LoadRuleTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
);
server1.addDataSegment(segment.getIdentifier(), segment);
DruidServer server2 = new DruidServer(
@ -227,7 +230,8 @@ public class LoadRuleTest
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER
DruidServer.DEFAULT_TIER,
0
);
server2.addDataSegment(segment.getIdentifier(), segment);
DruidCluster druidCluster = new DruidCluster(

View File

@ -11,19 +11,24 @@ import io.airlift.command.Command;
import io.druid.concurrent.Execs;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.db.DatabaseSegmentManager;
import io.druid.db.DatabaseSegmentManagerConfig;
import io.druid.db.DatabaseSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.ManageLifecycleLast;
import io.druid.guice.NodeTypeConfig;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.QueryResource;
import io.druid.server.bridge.Bridge;
import io.druid.server.bridge.BridgeCuratorConfig;
import io.druid.server.bridge.BridgeJettyServerInitializer;
import io.druid.server.bridge.BridgeQuerySegmentWalker;
import io.druid.server.bridge.BridgeZkCoordinator;
import io.druid.server.bridge.DruidClusterBridge;
import io.druid.server.bridge.DruidClusterBridgeConfig;
@ -44,7 +49,7 @@ import java.util.List;
*/
@Command(
name = "bridge",
description = "Runs a bridge node, see http://druid.io/docs/0.6.46/Bridge.html for a description." // TODO
description = "Runs a bridge node, see http://druid.io/docs/0.6.46/Bridge.html for a description."
)
public class CliBridge extends ServerRunnable
{
@ -74,11 +79,15 @@ public class CliBridge extends ServerRunnable
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(BridgeQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
ConfigProvider.bind(binder, DruidClusterBridgeConfig.class);
binder.bind(DruidClusterBridge.class);
LifecycleModule.register(binder, DruidClusterBridge.class);
binder.bind(JettyServerInitializer.class).toInstance(new BridgeJettyServerInitializer());
LifecycleModule.register(binder, BridgeZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
@ -123,6 +132,17 @@ public class CliBridge extends ServerRunnable
return framework;
}
@Provides
@ManageLifecycle
public ServerDiscoverySelector getServerDiscoverySelector(
DruidClusterBridgeConfig config,
ServerDiscoveryFactory factory
)
{
return factory.createSelector(config.getBrokerServiceName());
}
@Provides
@ManageLifecycle
@Bridge