diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 31a644cf887..f5b868e30fb 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -313,7 +313,7 @@ public class CachingClusteredClient implements QueryRunner final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); List intervals = segmentSpec.getIntervals(); - if ("realtime".equals(server.getType()) || !populateCache || isBySegment) { + if (server.isRealtime() || !populateCache || isBySegment) { resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec)); } else { resultSeqToAdd = toolChest.mergeSequences( diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index a8599fab820..9cf0d992bd6 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMap; */ public class DruidServer implements Comparable { + public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_NUM_REPLICANTS = 2; public static final String DEFAULT_TIER = "_default_tier"; @@ -62,7 +63,7 @@ public class DruidServer implements Comparable config.getMaxSize(), type, config.getTier(), - 0 + DEFAULT_PRIORITY ); } @@ -135,6 +136,11 @@ public class DruidServer implements Comparable return Collections.unmodifiableMap(segments); } + public boolean isRealtime() + { + return getType().equalsIgnoreCase("realtime"); + } + public DataSegment getSegment(String segmentName) { return segments.get(segmentName); diff --git a/server/src/main/java/io/druid/client/DruidServerConfig.java b/server/src/main/java/io/druid/client/DruidServerConfig.java index 420596ff65e..f0ab6ded61d 100644 --- a/server/src/main/java/io/druid/client/DruidServerConfig.java +++ b/server/src/main/java/io/druid/client/DruidServerConfig.java @@ -35,7 +35,7 @@ public class DruidServerConfig private String tier = DruidServer.DEFAULT_TIER; @JsonProperty - private int priority = 0; + private int priority = DruidServer.DEFAULT_PRIORITY; public long getMaxSize() { diff --git a/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java index 8a75e5403cf..4376ac01137 100644 --- a/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/ConnectionCountServerSelectorStrategy.java @@ -20,10 +20,14 @@ package io.druid.client.selector; import com.google.common.primitives.Ints; +import com.metamx.common.ISE; +import io.druid.timeline.DataSegment; import java.util.Collections; import java.util.Comparator; +import java.util.Map; import java.util.Set; +import java.util.TreeMap; public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy { @@ -37,8 +41,25 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra }; @Override - public QueryableDruidServer pick(Set servers) + public QueryableDruidServer pick( + TreeMap> prioritizedServers, DataSegment segment + ) { - return Collections.min(servers, comparator); + final Map.Entry> highestPriorityServers = prioritizedServers.pollLastEntry(); + + if (highestPriorityServers == null) { + return null; + } + + final Set servers = highestPriorityServers.getValue(); + final int size = servers.size(); + switch (size) { + case 0: + throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier()); + case 1: + return highestPriorityServers.getValue().iterator().next(); + default: + return Collections.min(servers, comparator); + } } } diff --git a/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java index 25e27ce50df..0e295211484 100644 --- a/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/RandomServerSelectorStrategy.java @@ -20,17 +20,36 @@ package io.druid.client.selector; import com.google.common.collect.Iterators; +import com.metamx.common.ISE; +import io.druid.timeline.DataSegment; +import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.TreeMap; public class RandomServerSelectorStrategy implements ServerSelectorStrategy { private static final Random random = new Random(); @Override - public QueryableDruidServer pick(Set servers) + public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment) { - return Iterators.get(servers.iterator(), random.nextInt(servers.size())); + final Map.Entry> highestPriorityServers = prioritizedServers.pollLastEntry(); + + if (highestPriorityServers == null) { + return null; + } + + final Set servers = highestPriorityServers.getValue(); + final int size = servers.size(); + switch (size) { + case 0: + throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier()); + case 1: + return highestPriorityServers.getValue().iterator().next(); + default: + return Iterators.get(servers.iterator(), random.nextInt(size)); + } } } diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index 6a6dcc9745b..232a722e563 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -20,19 +20,10 @@ package io.druid.client.selector; import com.google.api.client.util.Maps; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultimap; -import com.google.common.primitives.Ints; +import com.metamx.emitter.EmittingLogger; import io.druid.timeline.DataSegment; -import javax.annotation.Nullable; -import java.util.Collections; -import java.util.Comparator; import java.util.Set; import java.util.TreeMap; @@ -40,6 +31,9 @@ import java.util.TreeMap; */ public class ServerSelector implements DiscoverySelector { + + private static final EmittingLogger log = new EmittingLogger(ServerSelector.class); + private final Set servers = Sets.newHashSet(); private final DataSegment segment; @@ -85,26 +79,17 @@ public class ServerSelector implements DiscoverySelector public QueryableDruidServer pick() { synchronized (this) { - TreeMap> prioritzedServers = Maps.newTreeMap(); + TreeMap> prioritizedServers = Maps.newTreeMap(); for (QueryableDruidServer server : servers) { - Set theServers = prioritzedServers.get(server.getServer().getPriority()); + Set theServers = prioritizedServers.get(server.getServer().getPriority()); if (theServers == null) { theServers = Sets.newHashSet(); - prioritzedServers.put(server.getServer().getPriority(), theServers); + prioritizedServers.put(server.getServer().getPriority(), theServers); } theServers.add(server); } - final Set highestPriorityServers = prioritzedServers.pollLastEntry().getValue(); - final int size = highestPriorityServers.size(); - switch (size) { - case 0: - return null; - case 1: - return highestPriorityServers.iterator().next(); - default: - return strategy.pick(highestPriorityServers); - } + return strategy.pick(prioritizedServers, segment); } } } diff --git a/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java b/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java index d4684d28e08..065253061d8 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelectorStrategy.java @@ -21,8 +21,10 @@ package io.druid.client.selector; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.timeline.DataSegment; import java.util.Set; +import java.util.TreeMap; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class) @JsonSubTypes(value = { @@ -31,5 +33,5 @@ import java.util.Set; }) public interface ServerSelectorStrategy { - public QueryableDruidServer pick(Set servers); + public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment); } diff --git a/server/src/main/java/io/druid/db/DatabaseSegmentManager.java b/server/src/main/java/io/druid/db/DatabaseSegmentManager.java index eee1b0db360..930322106f8 100644 --- a/server/src/main/java/io/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/io/druid/db/DatabaseSegmentManager.java @@ -339,50 +339,6 @@ public class DatabaseSegmentManager return true; } - public boolean deleteSegment(final DataSegment segment) - { - try { - final String ds = segment.getDataSource(); - - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - String.format("DELETE from %s WHERE id = :id", getSegmentsTable()) - ) - .bind("id", segment.getIdentifier()) - .execute(); - - return null; - } - } - ); - - ConcurrentHashMap dataSourceMap = dataSources.get(); - - if (!dataSourceMap.containsKey(ds)) { - log.warn("Cannot find datasource %s", ds); - return false; - } - - DruidDataSource dataSource = dataSourceMap.get(ds); - dataSource.removePartition(segment.getIdentifier()); - - if (dataSource.isEmpty()) { - dataSourceMap.remove(ds); - } - } - catch (Exception e) { - log.error(e, e.toString()); - return false; - } - - return true; - } - public boolean isStarted() { return started; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index aa7bb3498d6..2b4d1f5f44b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -704,7 +704,7 @@ public class RealtimePlumber implements Plumber return ServerView.CallbackAction.UNREGISTER; } - if ("realtime".equals(server.getType())) { + if (server.isRealtime()) { return ServerView.CallbackAction.CONTINUE; } diff --git a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java index c892989ab15..d7e4674fab2 100644 --- a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java @@ -25,6 +25,7 @@ import com.google.common.base.Charsets; import com.google.inject.Inject; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; @@ -44,6 +45,8 @@ import java.util.List; */ public class BridgeQuerySegmentWalker implements QuerySegmentWalker { + private static final Logger log = new Logger(BridgeQuerySegmentWalker.class); + private final ServerDiscoverySelector brokerSelector; private final HttpClient httpClient; private final ObjectMapper jsonMapper; @@ -113,6 +116,8 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker return Sequences.simple(results); } catch (Exception e) { + log.error(e, "Exception with bridge query"); + return Sequences.empty(); } } diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index b0e9af83f68..f4073a18678 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -41,6 +41,7 @@ import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; import io.druid.server.DruidNode; import io.druid.server.coordination.AbstractDataSegmentAnnouncer; +import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -51,6 +52,7 @@ import org.apache.curator.utils.ZKPaths; import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -110,13 +112,15 @@ public class DruidClusterBridge this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.self = self; + ExecutorService serverInventoryViewExec = Executors.newFixedThreadPool( + 1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DruidClusterBridge-ServerInventoryView-%d") + .build() + ); + serverInventoryView.registerSegmentCallback( - Executors.newFixedThreadPool( - 1, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("DruidClusterBridge-ServerInventoryView-%d") - .build() - ), + serverInventoryViewExec, new ServerView.BaseSegmentCallback() { @Override @@ -147,15 +151,7 @@ public class DruidClusterBridge { try { synchronized (lock) { - Integer count = segments.get(segment); - if (count != null) { - if (count == 1) { - dataSegmentAnnouncer.unannounceSegment(segment); - segments.remove(segment); - } else { - segments.put(segment, count - 1); - } - } + serverRemovedSegment(dataSegmentAnnouncer, segment, server); } } catch (Exception e) { @@ -166,6 +162,27 @@ public class DruidClusterBridge } } ); + + serverInventoryView.registerServerCallback( + serverInventoryViewExec, + new ServerView.ServerCallback() + { + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + try { + for (DataSegment dataSegment : server.getSegments().values()) { + serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return ServerView.CallbackAction.CONTINUE; + } + } + + ); } public boolean isLeader() @@ -274,7 +291,7 @@ public class DruidClusterBridge DruidServer input ) { - return !input.getType().equalsIgnoreCase("realtime"); + return !input.isRealtime(); } } ); @@ -306,7 +323,7 @@ public class DruidClusterBridge } } } - if (leader) { // (We might no longer be coordinator) + if (leader) { // (We might no longer be leader) return ScheduledExecutors.Signal.REPEAT; } else { return ScheduledExecutors.Signal.STOP; @@ -326,7 +343,7 @@ public class DruidClusterBridge leaderLatch.get().start(); } catch (Exception e1) { - // If an exception gets thrown out here, then the coordinator will zombie out 'cause it won't be looking for + // If an exception gets thrown out here, then the bridge will zombie out 'cause it won't be looking for // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but // Curator likes to have "throws Exception" on methods so it might happen... log.makeAlert(e1, "I am a zombie") @@ -352,4 +369,23 @@ public class DruidClusterBridge } } } + + private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServer server) + throws IOException + { + Integer count = segments.get(segment); + if (count != null) { + if (count == 1) { + dataSegmentAnnouncer.unannounceSegment(segment); + segments.remove(segment); + } else { + segments.put(segment, count - 1); + } + } else { + log.makeAlert("Trying to remove a segment that was never added?") + .addData("server", server.getHost()) + .addData("segmentId", segment.getIdentifier()) + .emit(); + } + } } diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java index 8a98e3bb593..5478a375577 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java @@ -45,6 +45,8 @@ public abstract class DruidClusterBridgeConfig extends ZkPathsConfig public abstract String getBrokerServiceName(); @Config("druid.server.priority") - @Default("0") - public abstract int getPriority(); + public int getPriority() + { + return DruidServer.DEFAULT_PRIORITY; + } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index b63f95a8f7d..b11d2758035 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -577,7 +577,7 @@ public class DruidCoordinator if (leader) { theRunnable.run(); } - if (leader) { // (We might no longer be coordinator) + if (leader) { // (We might no longer be leader) return ScheduledExecutors.Signal.REPEAT; } else { return ScheduledExecutors.Signal.STOP; @@ -777,7 +777,7 @@ public class DruidCoordinator DruidServer input ) { - return !input.getType().equalsIgnoreCase("realtime"); + return !input.isRealtime(); } } ); diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index a8d9dd69b55..b7ab0e19714 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -176,6 +176,10 @@ public class DruidClusterBridgeTest EasyMock.anyObject(), EasyMock.anyObject() ); + batchServerInventoryView.registerServerCallback( + EasyMock.anyObject(), + EasyMock.anyObject() + ); EasyMock.expectLastCall(); batchServerInventoryView.start(); EasyMock.expectLastCall(); diff --git a/services/src/main/java/io/druid/cli/CliBridge.java b/services/src/main/java/io/druid/cli/CliBridge.java index 13bac634422..ffed4789727 100644 --- a/services/src/main/java/io/druid/cli/CliBridge.java +++ b/services/src/main/java/io/druid/cli/CliBridge.java @@ -49,7 +49,7 @@ import java.util.List; */ @Command( name = "bridge", - description = "Runs a bridge node, see http://druid.io/docs/0.6.46/Bridge.html for a description." + description = "This is a highly experimental node to use at your own discretion" ) public class CliBridge extends ServerRunnable {