fix according to code review

This commit is contained in:
fjy 2014-01-21 16:26:28 -08:00
parent 16f5f1d9cb
commit f3e1cbc423
15 changed files with 135 additions and 99 deletions

View File

@ -313,7 +313,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals(); List<Interval> intervals = segmentSpec.getIntervals();
if ("realtime".equals(server.getType()) || !populateCache || isBySegment) { if (server.isRealtime() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec)); resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else { } else {
resultSeqToAdd = toolChest.mergeSequences( resultSeqToAdd = toolChest.mergeSequences(

View File

@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
*/ */
public class DruidServer implements Comparable public class DruidServer implements Comparable
{ {
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_NUM_REPLICANTS = 2; public static final int DEFAULT_NUM_REPLICANTS = 2;
public static final String DEFAULT_TIER = "_default_tier"; public static final String DEFAULT_TIER = "_default_tier";
@ -62,7 +63,7 @@ public class DruidServer implements Comparable
config.getMaxSize(), config.getMaxSize(),
type, type,
config.getTier(), config.getTier(),
0 DEFAULT_PRIORITY
); );
} }
@ -135,6 +136,11 @@ public class DruidServer implements Comparable
return Collections.unmodifiableMap(segments); return Collections.unmodifiableMap(segments);
} }
public boolean isRealtime()
{
return getType().equalsIgnoreCase("realtime");
}
public DataSegment getSegment(String segmentName) public DataSegment getSegment(String segmentName)
{ {
return segments.get(segmentName); return segments.get(segmentName);

View File

@ -35,7 +35,7 @@ public class DruidServerConfig
private String tier = DruidServer.DEFAULT_TIER; private String tier = DruidServer.DEFAULT_TIER;
@JsonProperty @JsonProperty
private int priority = 0; private int priority = DruidServer.DEFAULT_PRIORITY;
public long getMaxSize() public long getMaxSize()
{ {

View File

@ -20,10 +20,14 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{ {
@ -37,8 +41,25 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
}; };
@Override @Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers) public QueryableDruidServer pick(
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
)
{ {
return Collections.min(servers, comparator); final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
if (highestPriorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
final int size = servers.size();
switch (size) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return highestPriorityServers.getValue().iterator().next();
default:
return Collections.min(servers, comparator);
}
} }
} }

View File

@ -20,17 +20,36 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{ {
private static final Random random = new Random(); private static final Random random = new Random();
@Override @Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers) public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment)
{ {
return Iterators.get(servers.iterator(), random.nextInt(servers.size())); final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
if (highestPriorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
final int size = servers.size();
switch (size) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return highestPriorityServers.getValue().iterator().next();
default:
return Iterators.get(servers.iterator(), random.nextInt(size));
}
} }
} }

View File

@ -20,19 +20,10 @@
package io.druid.client.selector; package io.druid.client.selector;
import com.google.api.client.util.Maps; import com.google.api.client.util.Maps;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap; import com.metamx.emitter.EmittingLogger;
import com.google.common.primitives.Ints;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -40,6 +31,9 @@ import java.util.TreeMap;
*/ */
public class ServerSelector implements DiscoverySelector<QueryableDruidServer> public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{ {
private static final EmittingLogger log = new EmittingLogger(ServerSelector.class);
private final Set<QueryableDruidServer> servers = Sets.newHashSet(); private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment; private final DataSegment segment;
@ -85,26 +79,17 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
public QueryableDruidServer pick() public QueryableDruidServer pick()
{ {
synchronized (this) { synchronized (this) {
TreeMap<Integer, Set<QueryableDruidServer>> prioritzedServers = Maps.newTreeMap(); TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = Maps.newTreeMap();
for (QueryableDruidServer server : servers) { for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritzedServers.get(server.getServer().getPriority()); Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) { if (theServers == null) {
theServers = Sets.newHashSet(); theServers = Sets.newHashSet();
prioritzedServers.put(server.getServer().getPriority(), theServers); prioritizedServers.put(server.getServer().getPriority(), theServers);
} }
theServers.add(server); theServers.add(server);
} }
final Set<QueryableDruidServer> highestPriorityServers = prioritzedServers.pollLastEntry().getValue(); return strategy.pick(prioritizedServers, segment);
final int size = highestPriorityServers.size();
switch (size) {
case 0:
return null;
case 1:
return highestPriorityServers.iterator().next();
default:
return strategy.pick(highestPriorityServers);
}
} }
} }
} }

View File

@ -21,8 +21,10 @@ package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@ -31,5 +33,5 @@ import java.util.Set;
}) })
public interface ServerSelectorStrategy public interface ServerSelectorStrategy
{ {
public QueryableDruidServer pick(Set<QueryableDruidServer> servers); public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
} }

View File

@ -339,50 +339,6 @@ public class DatabaseSegmentManager
return true; return true;
} }
public boolean deleteSegment(final DataSegment segment)
{
try {
final String ds = segment.getDataSource();
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.execute();
return null;
}
}
);
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
if (!dataSourceMap.containsKey(ds)) {
log.warn("Cannot find datasource %s", ds);
return false;
}
DruidDataSource dataSource = dataSourceMap.get(ds);
dataSource.removePartition(segment.getIdentifier());
if (dataSource.isEmpty()) {
dataSourceMap.remove(ds);
}
}
catch (Exception e) {
log.error(e, e.toString());
return false;
}
return true;
}
public boolean isStarted() public boolean isStarted()
{ {
return started; return started;

View File

@ -704,7 +704,7 @@ public class RealtimePlumber implements Plumber
return ServerView.CallbackAction.UNREGISTER; return ServerView.CallbackAction.UNREGISTER;
} }
if ("realtime".equals(server.getType())) { if (server.isRealtime()) {
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
} }

View File

@ -25,6 +25,7 @@ import com.google.common.base.Charsets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
@ -44,6 +45,8 @@ import java.util.List;
*/ */
public class BridgeQuerySegmentWalker implements QuerySegmentWalker public class BridgeQuerySegmentWalker implements QuerySegmentWalker
{ {
private static final Logger log = new Logger(BridgeQuerySegmentWalker.class);
private final ServerDiscoverySelector brokerSelector; private final ServerDiscoverySelector brokerSelector;
private final HttpClient httpClient; private final HttpClient httpClient;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@ -113,6 +116,8 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
return Sequences.simple(results); return Sequences.simple(results);
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception with bridge query");
return Sequences.empty(); return Sequences.empty();
} }
} }

View File

@ -41,6 +41,7 @@ import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer; import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -51,6 +52,7 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -110,13 +112,15 @@ public class DruidClusterBridge
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.self = self; this.self = self;
ExecutorService serverInventoryViewExec = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DruidClusterBridge-ServerInventoryView-%d")
.build()
);
serverInventoryView.registerSegmentCallback( serverInventoryView.registerSegmentCallback(
Executors.newFixedThreadPool( serverInventoryViewExec,
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DruidClusterBridge-ServerInventoryView-%d")
.build()
),
new ServerView.BaseSegmentCallback() new ServerView.BaseSegmentCallback()
{ {
@Override @Override
@ -147,15 +151,7 @@ public class DruidClusterBridge
{ {
try { try {
synchronized (lock) { synchronized (lock) {
Integer count = segments.get(segment); serverRemovedSegment(dataSegmentAnnouncer, segment, server);
if (count != null) {
if (count == 1) {
dataSegmentAnnouncer.unannounceSegment(segment);
segments.remove(segment);
} else {
segments.put(segment, count - 1);
}
}
} }
} }
catch (Exception e) { catch (Exception e) {
@ -166,6 +162,27 @@ public class DruidClusterBridge
} }
} }
); );
serverInventoryView.registerServerCallback(
serverInventoryViewExec,
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
try {
for (DataSegment dataSegment : server.getSegments().values()) {
serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
} }
public boolean isLeader() public boolean isLeader()
@ -274,7 +291,7 @@ public class DruidClusterBridge
DruidServer input DruidServer input
) )
{ {
return !input.getType().equalsIgnoreCase("realtime"); return !input.isRealtime();
} }
} }
); );
@ -306,7 +323,7 @@ public class DruidClusterBridge
} }
} }
} }
if (leader) { // (We might no longer be coordinator) if (leader) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT; return ScheduledExecutors.Signal.REPEAT;
} else { } else {
return ScheduledExecutors.Signal.STOP; return ScheduledExecutors.Signal.STOP;
@ -326,7 +343,7 @@ public class DruidClusterBridge
leaderLatch.get().start(); leaderLatch.get().start();
} }
catch (Exception e1) { catch (Exception e1) {
// If an exception gets thrown out here, then the coordinator will zombie out 'cause it won't be looking for // If an exception gets thrown out here, then the bridge will zombie out 'cause it won't be looking for
// the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
// Curator likes to have "throws Exception" on methods so it might happen... // Curator likes to have "throws Exception" on methods so it might happen...
log.makeAlert(e1, "I am a zombie") log.makeAlert(e1, "I am a zombie")
@ -352,4 +369,23 @@ public class DruidClusterBridge
} }
} }
} }
private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServer server)
throws IOException
{
Integer count = segments.get(segment);
if (count != null) {
if (count == 1) {
dataSegmentAnnouncer.unannounceSegment(segment);
segments.remove(segment);
} else {
segments.put(segment, count - 1);
}
} else {
log.makeAlert("Trying to remove a segment that was never added?")
.addData("server", server.getHost())
.addData("segmentId", segment.getIdentifier())
.emit();
}
}
} }

View File

@ -45,6 +45,8 @@ public abstract class DruidClusterBridgeConfig extends ZkPathsConfig
public abstract String getBrokerServiceName(); public abstract String getBrokerServiceName();
@Config("druid.server.priority") @Config("druid.server.priority")
@Default("0") public int getPriority()
public abstract int getPriority(); {
return DruidServer.DEFAULT_PRIORITY;
}
} }

View File

@ -577,7 +577,7 @@ public class DruidCoordinator
if (leader) { if (leader) {
theRunnable.run(); theRunnable.run();
} }
if (leader) { // (We might no longer be coordinator) if (leader) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT; return ScheduledExecutors.Signal.REPEAT;
} else { } else {
return ScheduledExecutors.Signal.STOP; return ScheduledExecutors.Signal.STOP;
@ -777,7 +777,7 @@ public class DruidCoordinator
DruidServer input DruidServer input
) )
{ {
return !input.getType().equalsIgnoreCase("realtime"); return !input.isRealtime();
} }
} }
); );

View File

@ -176,6 +176,10 @@ public class DruidClusterBridgeTest
EasyMock.<Executor>anyObject(), EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject() EasyMock.<ServerView.SegmentCallback>anyObject()
); );
batchServerInventoryView.registerServerCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.ServerCallback>anyObject()
);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
batchServerInventoryView.start(); batchServerInventoryView.start();
EasyMock.expectLastCall(); EasyMock.expectLastCall();

View File

@ -49,7 +49,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "bridge", name = "bridge",
description = "Runs a bridge node, see http://druid.io/docs/0.6.46/Bridge.html for a description." description = "This is a highly experimental node to use at your own discretion"
) )
public class CliBridge extends ServerRunnable public class CliBridge extends ServerRunnable
{ {