mirror of https://github.com/apache/druid.git
1) Eliminate PhoneBook
This commit is contained in:
parent
dde50a0d87
commit
57c31656d1
|
@ -33,8 +33,8 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.ServerInventoryThingieConfig;
|
||||
import com.metamx.druid.client.ServerInventoryThingie;
|
||||
import com.metamx.druid.client.ServerInventoryThingieConfig;
|
||||
import com.metamx.druid.concurrent.Execs;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.http.RequestLogger;
|
||||
|
@ -53,7 +53,6 @@ import com.metamx.metrics.Monitor;
|
|||
import com.metamx.metrics.MonitorScheduler;
|
||||
import com.metamx.metrics.MonitorSchedulerConfig;
|
||||
import com.metamx.metrics.SysMonitor;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import org.joda.time.Duration;
|
||||
import org.mortbay.jetty.Server;
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.client;
|
||||
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.phonebook.PhoneBookPeon;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface InventoryManagementStrategy<T>
|
||||
{
|
||||
public Class<T> getContainerClass();
|
||||
public Pair<String, PhoneBookPeon<?>> makeSubListener(final T baseObject);
|
||||
public void objectRemoved(final T baseObject);
|
||||
|
||||
// These are a hack to get around a poor serialization choice, please do not use
|
||||
public boolean doesSerde();
|
||||
public T deserialize(String name, Map<String, String> properties);
|
||||
}
|
|
@ -1,198 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.phonebook;
|
||||
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BasePhoneBook implements PhoneBook
|
||||
{
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final StoppedPhoneBook stoppedPages = new StoppedPhoneBook();
|
||||
private final PhoneBook actualPages;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
||||
public BasePhoneBook(PhoneBook actualPages)
|
||||
{
|
||||
this.actualPages = actualPages;
|
||||
}
|
||||
|
||||
@Override
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
actualPages.start();
|
||||
|
||||
for (Map.Entry<String, Map<String, Object>> services : stoppedPages.getAnnouncements().entrySet()) {
|
||||
String serviceName = services.getKey();
|
||||
for (Map.Entry<String, Object> announcements : services.getValue().entrySet()) {
|
||||
String nodeName = announcements.getKey();
|
||||
|
||||
actualPages.announce(serviceName, nodeName, announcements.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, PhoneBookPeon> listenerEntry : stoppedPages.getListeners().entries()) {
|
||||
actualPages.registerListener(listenerEntry.getKey(), listenerEntry.getValue());
|
||||
}
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
started = false;
|
||||
|
||||
for (Map.Entry<String, Map<String, Object>> services : stoppedPages.getAnnouncements().entrySet()) {
|
||||
String serviceName = services.getKey();
|
||||
for (String nodeName : services.getValue().keySet()) {
|
||||
actualPages.unannounce(serviceName, nodeName);
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, PhoneBookPeon> listenerEntry : stoppedPages.getListeners().entries()) {
|
||||
actualPages.unregisterListener(listenerEntry.getKey(), listenerEntry.getValue());
|
||||
}
|
||||
|
||||
actualPages.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStarted()
|
||||
{
|
||||
return started;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void announce(String serviceName, String nodeName, T properties)
|
||||
{
|
||||
synchronized (lock) {
|
||||
stoppedPages.announce(serviceName, nodeName, properties);
|
||||
|
||||
if (started) {
|
||||
actualPages.announce(serviceName, nodeName, properties);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounce(String serviceName, String nodeName)
|
||||
{
|
||||
synchronized (lock) {
|
||||
stoppedPages.unannounce(serviceName, nodeName);
|
||||
|
||||
if (started) {
|
||||
actualPages.unannounce(serviceName, nodeName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T lookup(String serviceName, Class<? extends T> clazz)
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (! started) {
|
||||
throw new ISE("Cannot lookup on a stopped PhoneBook.");
|
||||
}
|
||||
|
||||
return actualPages.lookup(serviceName, clazz);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void post(String serviceName, String nodeName, T properties)
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (! started) {
|
||||
throw new ISE("Cannot post to a stopped PhoneBook.");
|
||||
}
|
||||
|
||||
actualPages.post(serviceName, nodeName, properties);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean unpost(String serviceName, String nodeName)
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (! started) {
|
||||
throw new ISE("Cannot post to a stopped PhoneBook.");
|
||||
}
|
||||
|
||||
return actualPages.unpost(serviceName, nodeName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void postEphemeral(String serviceName, String nodeName, T properties)
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (! started) {
|
||||
throw new ISE("Cannot post to a stopped PhoneBook.");
|
||||
}
|
||||
|
||||
actualPages.postEphemeral(serviceName, nodeName, properties);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void registerListener(String serviceName, PhoneBookPeon<T> peon)
|
||||
{
|
||||
synchronized (lock) {
|
||||
stoppedPages.registerListener(serviceName, peon);
|
||||
|
||||
if (started) {
|
||||
actualPages.registerListener(serviceName, peon);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void unregisterListener(String serviceName, PhoneBookPeon<T> peon)
|
||||
{
|
||||
synchronized (lock) {
|
||||
stoppedPages.unregisterListener(serviceName, peon);
|
||||
|
||||
if (started) {
|
||||
actualPages.unregisterListener(serviceName, peon);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String combineParts(List<String> parts)
|
||||
{
|
||||
return actualPages.combineParts(parts);
|
||||
}
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.phonebook;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A PhoneBook object is just like a phone book. You can publish ("announce") your services to it as well as
|
||||
* find out about other people publishing their services (registerListener).
|
||||
*
|
||||
* Finding out about other people's announcements is accomplished by employing a Peon, who gets notified of
|
||||
* announcements coming and going and does something with them.
|
||||
*/
|
||||
public interface PhoneBook
|
||||
{
|
||||
public void start();
|
||||
public void stop();
|
||||
public boolean isStarted();
|
||||
public <T> void announce(String serviceName, String nodeName, T properties);
|
||||
public void unannounce(String serviceName, String nodeName);
|
||||
public <T> T lookup(String serviceName, Class<? extends T> clazz);
|
||||
public <T> void post(String serviceName, String nodeName, T properties);
|
||||
public boolean unpost(String serviceName, String nodeName);
|
||||
public <T> void postEphemeral(String serviceName, String nodeName, T properties);
|
||||
public <T> void registerListener(String serviceName, PhoneBookPeon<T> peon);
|
||||
public <T> void unregisterListener(String serviceName, PhoneBookPeon<T> peon);
|
||||
|
||||
/**
|
||||
* A method to combine a number of hierarchical parts into a String that would "work" for this PhoneBook implementation.
|
||||
*
|
||||
* I.e., a call to combineParts("A", "B") should return the String "serviceName" that can be used to register a
|
||||
* listener underneath something that was announced via a call to announce("A", "B", {})
|
||||
*
|
||||
* @param parts
|
||||
* @return
|
||||
*/
|
||||
public String combineParts(List<String> parts);
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.phonebook;
|
||||
|
||||
/**
|
||||
* A PhoneBookPeon is a Dilbert-like character who sits in his cubicle all day just waiting to hear about newEntry()s
|
||||
* and removed entries. He acts as the go between, someone that the PhoneBook knows how to talk to and can then
|
||||
* translate said message into whatever his employer wants.
|
||||
*/
|
||||
public interface PhoneBookPeon<T>
|
||||
{
|
||||
public Class<T> getObjectClazz();
|
||||
public void newEntry(String name, T properties);
|
||||
public void entryRemoved(String name);
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.phonebook;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A ServiceLookup is an object that, when given a key, will return a metadata map for that key. This was created
|
||||
* for use in doing things like consistent hashing, where the lookupKey represents the partition key and the
|
||||
* metadata map has stuff like host and port in it (basically, the information required to be able to contact the server)
|
||||
*/
|
||||
public interface ServiceLookup
|
||||
{
|
||||
public Map<String, String> get(String lookupKey);
|
||||
}
|
|
@ -1,175 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.phonebook;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Constraint;
|
||||
import com.google.common.collect.Constraints;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Multimaps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A class that collects announcements for you. Can be used to simplify start/stop logic
|
||||
*
|
||||
* Not thread-safe
|
||||
*/
|
||||
class StoppedPhoneBook implements PhoneBook
|
||||
{
|
||||
private static final Logger log = new Logger(StoppedPhoneBook.class);
|
||||
|
||||
private final Map<String, Map<String, Object>> announcements = Maps.newHashMap();
|
||||
private final Multimap<String, PhoneBookPeon> listeners = Multimaps.newSetMultimap(
|
||||
Maps.<String, Collection<PhoneBookPeon>>newHashMap(),
|
||||
new Supplier<Set<PhoneBookPeon>>()
|
||||
{
|
||||
@Override
|
||||
public Set<PhoneBookPeon> get()
|
||||
{
|
||||
final HashSet<PhoneBookPeon> theSet = Sets.newHashSet();
|
||||
return Constraints.constrainedSet(
|
||||
theSet,
|
||||
new Constraint<PhoneBookPeon>()
|
||||
{
|
||||
@Override
|
||||
public PhoneBookPeon checkElement(PhoneBookPeon element)
|
||||
{
|
||||
if (theSet.contains(element)) {
|
||||
throw new IAE("Listener[%s] has already been registered", element);
|
||||
}
|
||||
|
||||
return element;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStarted()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void announce(String serviceName, String nodeName, T properties)
|
||||
{
|
||||
Map<String, Object> serviceAnnouncements = announcements.get(serviceName);
|
||||
|
||||
if (serviceAnnouncements == null) {
|
||||
serviceAnnouncements = Maps.newHashMap();
|
||||
announcements.put(serviceName, serviceAnnouncements);
|
||||
}
|
||||
|
||||
serviceAnnouncements.put(nodeName, properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unannounce(String serviceName, String nodeName)
|
||||
{
|
||||
Map<String, Object> serviceAnnouncements = announcements.get(serviceName);
|
||||
|
||||
if (serviceAnnouncements == null) {
|
||||
throw new IAE("Cannot unannounce[%s]: No announcements for service[%s]", nodeName, serviceName);
|
||||
}
|
||||
|
||||
if (! serviceAnnouncements.containsKey(nodeName)) {
|
||||
log.warn("Cannot unannounce[%s]: it doesn't exist for service[%s]", nodeName, serviceName);
|
||||
return;
|
||||
}
|
||||
|
||||
serviceAnnouncements.remove(nodeName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T lookup(String serviceName, Class<? extends T> clazz)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void post(String serviceName, String nodeName, T properties)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean unpost(String serviceName, String nodeName)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void postEphemeral(String serviceName, String nodeName, T properties)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void registerListener(String serviceName, PhoneBookPeon<T> peon)
|
||||
{
|
||||
listeners.put(serviceName, peon);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void unregisterListener(String serviceName, PhoneBookPeon<T> peon)
|
||||
{
|
||||
if (! listeners.remove(serviceName, peon)) {
|
||||
throw new IAE("Cannot unregister listener[%s] on service[%s] that wasn't first registered.", serviceName, peon);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String combineParts(List<String> parts)
|
||||
{
|
||||
throw new UnsupportedOperationException("This should never be called");
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Object>> getAnnouncements()
|
||||
{
|
||||
return announcements;
|
||||
}
|
||||
|
||||
public Multimap<String, PhoneBookPeon> getListeners()
|
||||
{
|
||||
return listeners;
|
||||
}
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.netflix.curator.framework.recipes.leader.LeaderLatch;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DruidClusterInfo
|
||||
{
|
||||
private final DruidClusterInfoConfig config;
|
||||
private final PhoneBook yp;
|
||||
|
||||
public DruidClusterInfo(
|
||||
DruidClusterInfoConfig config,
|
||||
PhoneBook zkPhoneBook
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.yp = zkPhoneBook;
|
||||
}
|
||||
|
||||
public Map<String, String> lookupCurrentLeader()
|
||||
{
|
||||
return (Map<String, String>) yp.lookup(
|
||||
yp.combineParts(Arrays.asList(config.getMasterPath(), DruidMaster.MASTER_OWNER_NODE)), Map.class
|
||||
);
|
||||
}
|
||||
|
||||
public String getMasterHost()
|
||||
{
|
||||
return lookupCurrentLeader().get("host");
|
||||
}
|
||||
}
|
|
@ -21,15 +21,11 @@ package com.metamx.druid.coordination;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
|
@ -37,25 +33,15 @@ import com.metamx.druid.loading.SegmentLoadingException;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.AlertEvent;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.metamx.phonebook.PhoneBookPeon;
|
||||
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.recipes.cache.ChildData;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||
import com.netflix.curator.utils.ZKPaths;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -72,7 +58,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
private final CuratorFramework curator;
|
||||
private final ServerManager serverManager;
|
||||
private final ServiceEmitter emitter;
|
||||
private final List<Pair<String, PhoneBookPeon<?>>> peons;
|
||||
|
||||
private final String loadQueueLocation;
|
||||
private final String servedSegmentsLocation;
|
||||
|
@ -98,7 +83,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
this.serverManager = serverManager;
|
||||
this.emitter = emitter;
|
||||
|
||||
this.peons = new ArrayList<Pair<String, PhoneBookPeon<?>>>();
|
||||
this.loadQueueLocation = ZKPaths.makePath(config.getLoadQueueLocation(), me.getName());
|
||||
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsLocation(), me.getName());
|
||||
}
|
||||
|
|
|
@ -30,11 +30,10 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.client.DruidDataSource;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerInventoryThingie;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.master.rules.Rule;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -65,7 +64,6 @@ public class InfoResource
|
|||
private final ServerInventoryThingie serverInventoryThingie;
|
||||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final DatabaseRuleManager databaseRuleManager;
|
||||
private final DruidClusterInfo druidClusterInfo;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
@Inject
|
||||
|
@ -74,7 +72,6 @@ public class InfoResource
|
|||
ServerInventoryThingie serverInventoryThingie,
|
||||
DatabaseSegmentManager databaseSegmentManager,
|
||||
DatabaseRuleManager databaseRuleManager,
|
||||
DruidClusterInfo druidClusterInfo,
|
||||
IndexingServiceClient indexingServiceClient
|
||||
)
|
||||
{
|
||||
|
@ -82,7 +79,6 @@ public class InfoResource
|
|||
this.serverInventoryThingie = serverInventoryThingie;
|
||||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
this.databaseRuleManager = databaseRuleManager;
|
||||
this.druidClusterInfo = druidClusterInfo;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
|
@ -92,7 +88,7 @@ public class InfoResource
|
|||
public Response getMaster()
|
||||
{
|
||||
return Response.status(Response.Status.OK)
|
||||
.entity(druidClusterInfo.lookupCurrentLeader())
|
||||
.entity(master.getCurrentMaster())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -39,8 +39,6 @@ import com.metamx.druid.concurrent.Execs;
|
|||
import com.metamx.druid.config.ConfigManager;
|
||||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.coordination.DruidClusterInfoConfig;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseRuleManagerConfig;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
|
@ -187,11 +185,6 @@ public class MasterMain
|
|||
indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider);
|
||||
}
|
||||
|
||||
final DruidClusterInfo druidClusterInfo = new DruidClusterInfo(
|
||||
configFactory.build(DruidClusterInfoConfig.class),
|
||||
null // TODO
|
||||
);
|
||||
|
||||
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
|
||||
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());
|
||||
JacksonConfigManager configManager = new JacksonConfigManager(
|
||||
|
@ -204,7 +197,6 @@ public class MasterMain
|
|||
|
||||
final DruidMaster master = new DruidMaster(
|
||||
druidMasterConfig,
|
||||
druidClusterInfo,
|
||||
configManager,
|
||||
databaseSegmentManager,
|
||||
serverInventoryThingie,
|
||||
|
@ -244,7 +236,6 @@ public class MasterMain
|
|||
serverInventoryThingie,
|
||||
databaseSegmentManager,
|
||||
databaseRuleManager,
|
||||
druidClusterInfo,
|
||||
master,
|
||||
jsonMapper,
|
||||
indexingServiceClient
|
||||
|
@ -265,9 +256,18 @@ public class MasterMain
|
|||
public URL getRedirectURL(String queryString, String requestURI)
|
||||
{
|
||||
try {
|
||||
return (queryString == null) ?
|
||||
new URL(String.format("http://%s%s", druidClusterInfo.getMasterHost(), requestURI)) :
|
||||
new URL(String.format("http://%s%s?%s", druidClusterInfo.getMasterHost(), requestURI, queryString));
|
||||
final String currentMaster = master.getCurrentMaster();
|
||||
if (currentMaster == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String location = String.format("http://%s%s", currentMaster, requestURI);
|
||||
|
||||
if (queryString != null) {
|
||||
location = String.format("%s?%s", location, queryString);
|
||||
}
|
||||
|
||||
return new URL(location);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -23,11 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
|
||||
import com.google.inject.Provides;
|
||||
import com.metamx.druid.client.ServerInventoryThingie;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.sun.jersey.guice.JerseyServletModule;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
|
||||
|
@ -40,7 +39,6 @@ public class MasterServletModule extends JerseyServletModule
|
|||
private final ServerInventoryThingie serverInventoryThingie;
|
||||
private final DatabaseSegmentManager segmentInventoryManager;
|
||||
private final DatabaseRuleManager databaseRuleManager;
|
||||
private final DruidClusterInfo druidClusterInfo;
|
||||
private final DruidMaster master;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
@ -49,7 +47,6 @@ public class MasterServletModule extends JerseyServletModule
|
|||
ServerInventoryThingie serverInventoryThingie,
|
||||
DatabaseSegmentManager segmentInventoryManager,
|
||||
DatabaseRuleManager databaseRuleManager,
|
||||
DruidClusterInfo druidClusterInfo,
|
||||
DruidMaster master,
|
||||
ObjectMapper jsonMapper,
|
||||
IndexingServiceClient indexingServiceClient
|
||||
|
@ -58,7 +55,6 @@ public class MasterServletModule extends JerseyServletModule
|
|||
this.serverInventoryThingie = serverInventoryThingie;
|
||||
this.segmentInventoryManager = segmentInventoryManager;
|
||||
this.databaseRuleManager = databaseRuleManager;
|
||||
this.druidClusterInfo = druidClusterInfo;
|
||||
this.master = master;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
|
@ -73,7 +69,6 @@ public class MasterServletModule extends JerseyServletModule
|
|||
bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager);
|
||||
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
|
||||
bind(DruidMaster.class).toInstance(master);
|
||||
bind(DruidClusterInfo.class).toInstance(druidClusterInfo);
|
||||
bind(IndexingServiceClient.class).toInstance(indexingServiceClient);
|
||||
|
||||
serve("/*").with(GuiceContainer.class);
|
||||
|
|
|
@ -63,11 +63,16 @@ public class RedirectServlet extends DefaultServlet
|
|||
if (redirectInfo.doLocal()) {
|
||||
super.service(request, response);
|
||||
} else {
|
||||
URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());
|
||||
log.info("Forwarding request to [%s]", url);
|
||||
final URL redirectURL = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());
|
||||
if (redirectURL == null) {
|
||||
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
else {
|
||||
log.info("Forwarding request to [%s]", redirectURL);
|
||||
|
||||
response.setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY);
|
||||
response.setHeader("Location", url.toString());
|
||||
response.setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY);
|
||||
response.setHeader("Location", redirectURL.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import com.metamx.druid.client.ServerInventoryThingie;
|
|||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.concurrent.Execs;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
|
@ -79,7 +78,6 @@ public class DruidMaster
|
|||
private volatile boolean master = false;
|
||||
|
||||
private final DruidMasterConfig config;
|
||||
private final DruidClusterInfo clusterInfo;
|
||||
private final JacksonConfigManager configManager;
|
||||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final ServerInventoryThingie serverInventoryThingie;
|
||||
|
@ -95,7 +93,6 @@ public class DruidMaster
|
|||
|
||||
public DruidMaster(
|
||||
DruidMasterConfig config,
|
||||
DruidClusterInfo clusterInfo,
|
||||
JacksonConfigManager configManager,
|
||||
DatabaseSegmentManager databaseSegmentManager,
|
||||
ServerInventoryThingie serverInventoryThingie,
|
||||
|
@ -108,7 +105,6 @@ public class DruidMaster
|
|||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.clusterInfo = clusterInfo;
|
||||
this.configManager = configManager;
|
||||
|
||||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
|
@ -201,6 +197,17 @@ public class DruidMaster
|
|||
databaseSegmentManager.enableDatasource(ds);
|
||||
}
|
||||
|
||||
public String getCurrentMaster()
|
||||
{
|
||||
try {
|
||||
final LeaderLatch latch = leaderLatch.get();
|
||||
return latch == null ? null : latch.getLeader().getId();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
|
||||
{
|
||||
final DruidServer fromServer = serverInventoryThingie.getInventoryValue(from);
|
||||
|
@ -618,9 +625,9 @@ public class DruidMaster
|
|||
{
|
||||
try {
|
||||
synchronized (lock) {
|
||||
Map<String, String> currLeader = clusterInfo.lookupCurrentLeader();
|
||||
if (currLeader == null || !config.getHost().equals(currLeader.get("host"))) {
|
||||
log.info("I thought I was the master, but really [%s] is. Phooey.", currLeader);
|
||||
final LeaderLatch latch = leaderLatch.get();
|
||||
if (latch == null || !latch.hasLeadership()) {
|
||||
log.info("[%s] is master, not me. Phooey.", latch == null ? null : latch.getLeader().getId());
|
||||
stopBeingMaster();
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -25,10 +25,7 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import junit.framework.Assert;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -40,8 +37,6 @@ import org.junit.Test;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
|
|
@ -140,7 +140,6 @@ public class DruidMasterTest
|
|||
}
|
||||
},
|
||||
null,
|
||||
null,
|
||||
databaseSegmentManager,
|
||||
serverInventoryThingie,
|
||||
null,
|
||||
|
|
Loading…
Reference in New Issue