bridge is kinda working for mastery stuff right now, tests pass

This commit is contained in:
fjy 2014-01-06 17:01:50 -08:00
parent 98d01b1a9c
commit e11952f3b6
23 changed files with 1022 additions and 178 deletions

View File

@ -129,7 +129,6 @@ This describes the data schema for the output Druid segment. More information ab
|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes| |aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes|
|dataSource|String|The name of the dataSource that the segment belongs to.|yes| |dataSource|String|The name of the dataSource that the segment belongs to.|yes|
|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes| |indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes|
|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no| |shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no|
### Config ### Config

View File

@ -63,6 +63,7 @@ public class Announcer
private final PathChildrenCacheFactory factory; private final PathChildrenCacheFactory factory;
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList(); private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final List<Pair<String, byte[]>> toUpdate = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap(); private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap(); private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>(); private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
@ -92,6 +93,11 @@ public class Announcer
announce(pair.lhs, pair.rhs); announce(pair.lhs, pair.rhs);
} }
toAnnounce.clear(); toAnnounce.clear();
for (Pair<String, byte[]> pair : toUpdate) {
update(pair.lhs, pair.rhs);
}
toUpdate.clear();
} }
} }
@ -144,6 +150,8 @@ public class Announcer
} }
} }
log.info("Announcing with Curator %s", curator); // TODO
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath(); final String parentPath = pathAndNode.getPath();
@ -268,6 +276,13 @@ public class Announcer
public void update(final String path, final byte[] bytes) public void update(final String path, final byte[] bytes)
{ {
synchronized (toAnnounce) {
if (!started) {
toUpdate.add(Pair.of(path, bytes));
return;
}
}
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath(); final String parentPath = pathAndNode.getPath();

View File

@ -339,6 +339,50 @@ public class DatabaseSegmentManager
return true; return true;
} }
public boolean deleteSegment(final DataSegment segment)
{
try {
final String ds = segment.getDataSource();
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.execute();
return null;
}
}
);
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
if (!dataSourceMap.containsKey(ds)) {
log.warn("Cannot find datasource %s", ds);
return false;
}
DruidDataSource dataSource = dataSourceMap.get(ds);
dataSource.removePartition(segment.getIdentifier());
if (dataSource.isEmpty()) {
dataSourceMap.remove(ds);
}
}
catch (Exception e) {
log.error(e, e.toString());
return false;
}
return true;
}
public boolean isStarted() public boolean isStarted()
{ {
return started; return started;
@ -462,7 +506,8 @@ public class DatabaseSegmentManager
} }
} }
private String getSegmentsTable() { private String getSegmentsTable()
{
return dbTables.get().getSegmentsTable(); return dbTables.get().getSegmentsTable();
} }
} }

View File

@ -0,0 +1,17 @@
package io.druid.server.bridge;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Bridge
{
}

View File

@ -0,0 +1,12 @@
package io.druid.server.bridge;
import io.druid.curator.CuratorConfig;
import org.skife.config.Config;
/**
*/
public abstract class BridgeCuratorConfig extends CuratorConfig
{
@Config("druid.bridge.zk.service.host")
public abstract String getParentZkHosts();
}

View File

@ -0,0 +1,38 @@
package io.druid.server.bridge;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.initialization.JettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class BridgeJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
ServletHolder holderPwd = new ServletHolder("default", DefaultServlet.class);
root.addServlet(holderPwd, "/");
//root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GzipFilter.class, "/*", null);
// Can't use '/*' here because of Guice and Jetty static content conflicts
// The coordinator really needs a standarized api path
root.addFilter(GuiceFilter.class, "/status/*", null);
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root});
server.setHandler(handlerList);
}
}

View File

@ -0,0 +1,121 @@
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.server.coordination.BaseZkCoordinator;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
/**
*/
public class BridgeZkCoordinator extends BaseZkCoordinator
{
private static final Logger log = new Logger(BaseZkCoordinator.class);
private final DbSegmentPublisher dbSegmentPublisher;
private final DatabaseSegmentManager databaseSegmentManager;
private final ServerView serverView;
private final ExecutorService exec = Execs.singleThreaded("BridgeZkCoordinatorServerView-%s");
@Inject
public BridgeZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
@Bridge CuratorFramework curator,
DbSegmentPublisher dbSegmentPublisher,
DatabaseSegmentManager databaseSegmentManager,
ServerView serverView
)
{
super(jsonMapper, zkPaths, me, curator);
this.dbSegmentPublisher = dbSegmentPublisher;
this.databaseSegmentManager = databaseSegmentManager;
this.serverView = serverView;
}
@Override
public void createCacheDir()
{
// do nothing
}
@Override
public void loadCache()
{
// do nothing
}
@Override
public DataSegmentChangeHandler getDataSegmentChangeHandler()
{
return BridgeZkCoordinator.this;
}
@Override
public void addSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
try {
log.info("Publishing segment %s", segment.getIdentifier());
dbSegmentPublisher.publishSegment(segment);
serverView.registerSegmentCallback(
exec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServer server, DataSegment theSegment
)
{
if (theSegment.equals(segment)) {
callback.execute();
log.info("Callback executed");
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
databaseSegmentManager.removeSegment(segment.getDataSource(), segment.getIdentifier());
serverView.registerSegmentCallback(
exec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentRemoved(
DruidServer server, DataSegment theSegment
)
{
if (theSegment.equals(segment)) {
callback.execute();
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
}

View File

@ -0,0 +1,334 @@
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.curator.announcement.Announcer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class DruidClusterBridge
{
public static final String CONNECTOR_OWNER_NODE = "_CONNECTOR";
public static final String NODE_TYPE = "bridge";
private static final EmittingLogger log = new EmittingLogger(DruidClusterBridge.class);
private final ObjectMapper jsonMapper;
private final DruidClusterBridgeConfig config;
private final BridgeZkCoordinator bridgeZkCoordinator; // watches for assignments from main cluster
private final Announcer announcer; //announce self to main cluster
private final ServerInventoryView<Object> serverInventoryView;
private final CuratorFramework curator;
private final ScheduledExecutorService exec;
private final AtomicReference<LeaderLatch> leaderLatch;
private final DruidNode self;
private final Map<DataSegment, Integer> segments = Maps.newHashMap();
private final Object lock = new Object();
private volatile boolean started = false;
private volatile boolean leader = false;
@Inject
public DruidClusterBridge(
ObjectMapper jsonMapper,
DruidClusterBridgeConfig config,
BridgeZkCoordinator bridgeZkCoordinator,
@Bridge Announcer announcer,
@Bridge final AbstractDataSegmentAnnouncer dataSegmentAnnouncer,
ServerInventoryView serverInventoryView,
CuratorFramework curator,
ScheduledExecutorFactory scheduledExecutorFactory,
AtomicReference<LeaderLatch> leaderLatch,
@Self DruidNode self
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.bridgeZkCoordinator = bridgeZkCoordinator;
this.announcer = announcer;
this.serverInventoryView = serverInventoryView;
this.curator = curator;
this.leaderLatch = leaderLatch;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.self = self;
log.info("Local curator: [%s]", curator); // TODO
serverInventoryView.registerSegmentCallback(
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DruidClusterBridge-ServerInventoryView-%d")
.build()
),
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServer server, DataSegment segment
)
{
try {
synchronized (lock) {
Integer count = segments.get(segment);
if (count == null) {
segments.put(segment, 1);
dataSegmentAnnouncer.announceSegment(segment);
} else {
segments.put(segment, count + 1);
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
{
try {
synchronized (lock) {
Integer count = segments.get(segment);
if (count != null) {
if (count == 0) {
dataSegmentAnnouncer.unannounceSegment(segment);
segments.remove(segment);
} else {
segments.put(segment, count - 1);
}
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
public boolean isLeader()
{
return leader;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
started = true;
createNewLeaderLatch();
try {
leaderLatch.get().start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private LeaderLatch createNewLeaderLatch()
{
final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, ZKPaths.makePath(config.getConnectorPath(), CONNECTOR_OWNER_NODE), self.getHost()
);
newLeaderLatch.addListener(
new LeaderLatchListener()
{
@Override
public void isLeader()
{
becomeLeader();
}
@Override
public void notLeader()
{
stopBeingLeader();
}
},
Execs.singleThreaded("CoordinatorLeader-%s")
);
return leaderLatch.getAndSet(newLeaderLatch);
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
stopBeingLeader();
try {
leaderLatch.get().close();
}
catch (IOException e) {
log.warn(e, "Unable to close leaderLatch, ignoring");
}
exec.shutdownNow();
started = false;
}
}
private void becomeLeader()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Go-Go Gadgetmobile! Starting bridge in %s", config.getStartDelay());
try {
bridgeZkCoordinator.start();
serverInventoryView.start();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
config.getStartDelay(),
config.getPeriod(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
if (leader) {
Iterable<DruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
{
@Override
public boolean apply(
DruidServer input
)
{
return !input.getType().equalsIgnoreCase("realtime");
}
}
);
long totalMaxSize = 0;
for (DruidServer server : servers) {
totalMaxSize += server.getMaxSize();
}
if (totalMaxSize == 0) {
log.warn("No servers founds!");
} else {
DruidServerMetadata me = new DruidServerMetadata(
self.getHost(),
self.getHost(),
totalMaxSize,
NODE_TYPE,
config.getTier()
);
try {
final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHost());
log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize);
announcer.update(path, jsonMapper.writeValueAsBytes(me));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
if (leader) { // (We might no longer be coordinator)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
}
}
}
);
leader = true;
}
catch (Exception e) {
log.makeAlert(e, "Exception becoming leader")
.emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch);
try {
leaderLatch.get().start();
}
catch (Exception e1) {
// If an exception gets thrown out here, then the coordinator will zombie out 'cause it won't be looking for
// the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
// Curator likes to have "throws Exception" on methods so it might happen...
log.makeAlert(e1, "I am a zombie")
.emit();
}
}
}
}
private void stopBeingLeader()
{
synchronized (lock) {
try {
log.info("I'll get you next time, Gadget. Next time!");
bridgeZkCoordinator.stop();
serverInventoryView.stop();
leader = false;
}
catch (Exception e) {
log.makeAlert(e, "Unable to stopBeingLeader").emit();
}
}
}
}

View File

@ -0,0 +1,25 @@
package io.druid.server.bridge;
import io.druid.client.DruidServer;
import io.druid.server.initialization.ZkPathsConfig;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
// TODO: make sure that this uses sub cluster zk paths
public abstract class DruidClusterBridgeConfig extends ZkPathsConfig
{
@Config("druid.server.tier")
@Default(DruidServer.DEFAULT_TIER)
public abstract String getTier();
@Config("druid.bridge.startDelay")
@Default("PT300s")
public abstract Duration getStartDelay();
@Config("druid.bridge.period")
@Default("PT60s")
public abstract Duration getPeriod();
}

View File

@ -0,0 +1,181 @@
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
/**
*/
public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
{
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started;
public BaseZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
CuratorFramework curator
)
{
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.me = me;
this.curator = curator;
}
@LifecycleStart
public void start() throws IOException
{
log.info("Starting zkCoordinator for server[%s]", me);
synchronized (lock) {
if (started) {
return;
}
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
);
try {
createCacheDir();
log.info("Remote Curator[%s]", curator); // TODO
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest segment = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New node[%s] with segmentClass[%s]", path, segment.getClass());
try {
segment.go(
getDataSegmentChangeHandler(),
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@Override
public void execute()
{
try {
if (!hasRun) {
curator.delete().guaranteed().forPath(path);
log.info("Completed processing for node[%s]", path);
hasRun = true;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.info(e1, "Failed to delete node[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", segment)
.emit();
}
break;
case CHILD_REMOVED:
log.info("%s was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
log.info("Stopping ZkCoordinator for [%s]", me);
synchronized (lock) {
if (!started) {
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
loadQueueCache = null;
started = false;
}
}
}
public abstract void createCacheDir();
public abstract void loadCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
}

View File

@ -0,0 +1,8 @@
package io.druid.server.coordination;
/**
*/
public interface DataSegmentChangeCallback
{
public void execute();
}

View File

@ -25,6 +25,6 @@ import io.druid.timeline.DataSegment;
*/ */
public interface DataSegmentChangeHandler public interface DataSegmentChangeHandler
{ {
public void addSegment(DataSegment segment); public void addSegment(DataSegment segment, DataSegmentChangeCallback callback);
public void removeSegment(DataSegment segment); public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback);
} }

View File

@ -32,5 +32,5 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
}) })
public interface DataSegmentChangeRequest public interface DataSegmentChangeRequest
{ {
public void go(DataSegmentChangeHandler handler); public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback);
} }

View File

@ -46,9 +46,9 @@ public class SegmentChangeRequestDrop implements DataSegmentChangeRequest
} }
@Override @Override
public void go(DataSegmentChangeHandler handler) public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback)
{ {
handler.removeSegment(segment); handler.removeSegment(segment, callback);
} }
@Override @Override

View File

@ -39,9 +39,9 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
} }
@Override @Override
public void go(DataSegmentChangeHandler handler) public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback)
{ {
handler.addSegment(segment); handler.addSegment(segment, callback);
} }
@JsonProperty @JsonProperty

View File

@ -24,8 +24,8 @@ package io.druid.server.coordination;
public class SegmentChangeRequestNoop implements DataSegmentChangeRequest public class SegmentChangeRequestNoop implements DataSegmentChangeRequest
{ {
@Override @Override
public void go(DataSegmentChangeHandler handler) public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback)
{ {
// do nothing
} }
} }

View File

@ -20,23 +20,14 @@
package io.druid.server.coordination; package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -44,23 +35,15 @@ import java.util.List;
/** /**
*/ */
public class ZkCoordinator implements DataSegmentChangeHandler public class ZkCoordinator extends BaseZkCoordinator
{ {
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class); private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final SegmentLoaderConfig config; private final SegmentLoaderConfig config;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator;
private final ServerManager serverManager; private final ServerManager serverManager;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started;
@Inject @Inject
public ZkCoordinator( public ZkCoordinator(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
@ -72,126 +55,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler
ServerManager serverManager ServerManager serverManager
) )
{ {
super(jsonMapper, zkPaths, me, curator);
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.config = config; this.config = config;
this.zkPaths = zkPaths;
this.me = me;
this.announcer = announcer; this.announcer = announcer;
this.curator = curator;
this.serverManager = serverManager; this.serverManager = serverManager;
} }
@LifecycleStart @Override
public void start() throws IOException public void createCacheDir()
{ {
log.info("Starting zkCoordinator for server[%s]", me); config.getInfoDir().mkdirs();
synchronized (lock) {
if (started) {
return;
}
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
);
try {
config.getInfoDir().mkdirs();
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest segment = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New node[%s] with segmentClass[%s]", path, segment.getClass());
try {
segment.go(ZkCoordinator.this);
curator.delete().guaranteed().forPath(path);
log.info("Completed processing for node[%s]", path);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.info(e1, "Failed to delete node[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", segment)
.emit();
}
break;
case CHILD_REMOVED:
log.info("%s was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
}
} }
@LifecycleStop @Override
public void stop() public void loadCache()
{
log.info("Stopping ZkCoordinator with config[%s]", config);
synchronized (lock) {
if (!started) {
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
loadQueueCache = null;
started = false;
}
}
}
private void loadCache()
{ {
File baseDir = config.getInfoDir(); File baseDir = config.getInfoDir();
if (!baseDir.exists()) { if (!baseDir.exists()) {
@ -221,11 +100,27 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
} }
addSegments(cachedSegments); addSegments(
cachedSegments,
new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
}
);
} }
@Override @Override
public void addSegment(DataSegment segment) public DataSegmentChangeHandler getDataSegmentChangeHandler()
{
return ZkCoordinator.this;
}
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{ {
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
@ -235,7 +130,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
loaded = serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
removeSegment(segment); removeSegment(segment, callback);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
} }
@ -246,7 +141,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
jsonMapper.writeValue(segmentInfoCacheFile, segment); jsonMapper.writeValue(segmentInfoCacheFile, segment);
} }
catch (IOException e) { catch (IOException e) {
removeSegment(segment); removeSegment(segment, callback);
throw new SegmentLoadingException( throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
); );
@ -260,16 +155,18 @@ public class ZkCoordinator implements DataSegmentChangeHandler
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
} }
} }
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource") log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment) .addData("segment", segment)
.emit(); .emit();
} }
finally {
callback.execute();
}
} }
public void addSegments(Iterable<DataSegment> segments) public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
{ {
try { try {
final List<String> segmentFailures = Lists.newArrayList(); final List<String> segmentFailures = Lists.newArrayList();
@ -284,7 +181,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
removeSegment(segment); removeSegment(segment, callback);
segmentFailures.add(segment.getIdentifier()); segmentFailures.add(segment.getIdentifier());
continue; continue;
} }
@ -297,7 +194,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
catch (IOException e) { catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment); removeSegment(segment, callback);
segmentFailures.add(segment.getIdentifier()); segmentFailures.add(segment.getIdentifier());
continue; continue;
} }
@ -326,11 +223,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.addData("segments", segments) .addData("segments", segments)
.emit(); .emit();
} }
finally {
callback.execute();
}
} }
@Override @Override
public void removeSegment(DataSegment segment) public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{ {
try { try {
serverManager.dropSegment(segment); serverManager.dropSegment(segment);
@ -347,26 +247,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.addData("segment", segment) .addData("segment", segment)
.emit(); .emit();
} }
} finally {
callback.execute();
public void removeSegments(Iterable<DataSegment> segments)
{
try {
for (DataSegment segment : segments) {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
announcer.unannounceSegments(segments);
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segments")
.addData("segments", segments)
.emit();
} }
} }
} }

View File

@ -70,7 +70,6 @@ import org.apache.curator.utils.ZKPaths;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -694,11 +693,11 @@ public class DruidCoordinator
// Do coordinator stuff. // Do coordinator stuff.
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder() DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime) .withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory()) .withDatasources(databaseSegmentManager.getInventory())
.withDynamicConfigs(dynamicConfigs.get()) .withDynamicConfigs(dynamicConfigs.get())
.withEmitter(emitter) .withEmitter(emitter)
.build(); .build();
for (DruidCoordinatorHelper helper : helpers) { for (DruidCoordinatorHelper helper : helpers) {
@ -731,10 +730,10 @@ public class DruidCoordinator
{ {
@Override @Override
public boolean apply( public boolean apply(
@Nullable DruidServer input DruidServer input
) )
{ {
return input.getType().equalsIgnoreCase("historical"); return !input.getType().equalsIgnoreCase("realtime");
} }
} }
); );

View File

@ -19,6 +19,7 @@
package io.druid.server.coordinator.rules; package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -27,6 +28,7 @@ import org.joda.time.DateTime;
public class ForeverDropRule extends DropRule public class ForeverDropRule extends DropRule
{ {
@Override @Override
@JsonProperty
public String getType() public String getType()
{ {
return "dropForever"; return "dropForever";

View File

@ -41,6 +41,7 @@ public class ForeverLoadRule extends LoadRule
} }
@Override @Override
@JsonProperty
public String getType() public String getType()
{ {
return "loadForever"; return "loadForever";

View File

@ -66,6 +66,12 @@ public abstract class ZkPathsConfig
return defaultPath("coordinator"); return defaultPath("coordinator");
} }
@Config("druid.zk.paths.connectorPath")
public String getConnectorPath()
{
return defaultPath("connector");
}
@Config("druid.zk.paths.indexer.announcementsPath") @Config("druid.zk.paths.indexer.announcementsPath")
public String getIndexerAnnouncementPath() public String getIndexerAnnouncementPath()
{ {

View File

@ -0,0 +1,158 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.concurrent.Execs;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.db.DatabaseSegmentManager;
import io.druid.db.DatabaseSegmentManagerConfig;
import io.druid.db.DatabaseSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.ManageLifecycleLast;
import io.druid.guice.NodeTypeConfig;
import io.druid.server.bridge.Bridge;
import io.druid.server.bridge.BridgeCuratorConfig;
import io.druid.server.bridge.BridgeJettyServerInitializer;
import io.druid.server.bridge.BridgeZkCoordinator;
import io.druid.server.bridge.DruidClusterBridge;
import io.druid.server.bridge.DruidClusterBridgeConfig;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.eclipse.jetty.server.Server;
import java.util.List;
/**
*/
@Command(
name = "bridge",
description = "Runs a bridge node, see http://druid.io/docs/0.6.46/Bridge.html for a description." // TODO
)
public class CliBridge extends ServerRunnable
{
private static final Logger log = new Logger(CliBridge.class);
public CliBridge()
{
super(log);
}
@Override
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, BridgeCuratorConfig.class);
binder.bind(BridgeZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("bridge"));
JsonConfigProvider.bind(binder, "druid.manager.segments", DatabaseSegmentManagerConfig.class);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);
ConfigProvider.bind(binder, DruidClusterBridgeConfig.class);
binder.bind(DruidClusterBridge.class);
LifecycleModule.register(binder, DruidClusterBridge.class);
binder.bind(JettyServerInitializer.class).toInstance(new BridgeJettyServerInitializer());
LifecycleModule.register(binder, BridgeZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
}
@Provides
@LazySingleton
@Bridge
public CuratorFramework getBridgeCurator(final BridgeCuratorConfig bridgeCuratorConfig, Lifecycle lifecycle)
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(bridgeCuratorConfig.getParentZkHosts())
.sessionTimeoutMs(bridgeCuratorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
bridgeCuratorConfig.enableCompression()
)
)
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Curator for %s", bridgeCuratorConfig.getParentZkHosts());
framework.start();
}
@Override
public void stop()
{
log.info("Stopping Curator");
framework.close();
}
}
);
return framework;
}
@Provides
@ManageLifecycle
@Bridge
public Announcer getBridgeAnnouncer(
@Bridge CuratorFramework curator
)
{
return new Announcer(curator, Execs.singleThreaded("BridgeAnnouncer-%s"));
}
@Provides
@ManageLifecycleLast
@Bridge
public AbstractDataSegmentAnnouncer getBridgeDataSegmentAnnouncer(
DruidServerMetadata metadata,
BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPathsConfig,
@Bridge Announcer announcer,
ObjectMapper jsonMapper
)
{
return new BatchDataSegmentAnnouncer(
metadata,
config,
zkPathsConfig,
announcer,
jsonMapper
);
}
}
);
}
}

View File

@ -51,7 +51,8 @@ public class Main
.withDefaultCommand(Help.class) .withDefaultCommand(Help.class)
.withCommands( .withCommands(
CliCoordinator.class, CliHistorical.class, CliBroker.class, CliCoordinator.class, CliHistorical.class, CliBroker.class,
CliRealtime.class, CliOverlord.class, CliMiddleManager.class CliRealtime.class, CliOverlord.class, CliMiddleManager.class,
CliBridge.class
); );
builder.withGroup("example") builder.withGroup("example")