1) Fix bug with Master not starting up without an indexer specified

2) Fix bug with PotentiallyGzippedCompressionProvider only catching ZipExceptions and not IOException (java 6 throws IO, java 7 throws Zip)
3) Create DruidServerMetadata and use that instead of DruidServer to represent the current server
This commit is contained in:
cheddar 2013-04-23 19:20:56 -05:00
parent f71b941a1a
commit b8ba9138ff
13 changed files with 159 additions and 113 deletions

View File

@ -33,13 +33,13 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.ServerInventoryThingie; import com.metamx.druid.client.ServerInventoryThingie;
import com.metamx.druid.client.ServerInventoryThingieConfig; import com.metamx.druid.client.ServerInventoryThingieConfig;
import com.metamx.druid.concurrent.Execs; import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
@ -85,7 +85,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private final ConfigurationObjectFactory configFactory; private final ConfigurationObjectFactory configFactory;
private final String nodeType; private final String nodeType;
private DruidServer druidServer = null; private DruidServerMetadata druidServerMetadata = null;
private ServiceEmitter emitter = null; private ServiceEmitter emitter = null;
private List<Monitor> monitors = null; private List<Monitor> monitors = null;
private Server server = null; private Server server = null;
@ -127,9 +127,9 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
this.nodeType = nodeType; this.nodeType = nodeType;
} }
public T setDruidServer(DruidServer druidServer) public T setDruidServerMetadata(DruidServerMetadata druidServerMetadata)
{ {
checkFieldNotSetAndSet("druidServer", druidServer); checkFieldNotSetAndSet("druidServerMetadata", druidServerMetadata);
return (T) this; return (T) this;
} }
@ -146,7 +146,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
checkFieldNotSetAndSet("announcer", announcer); checkFieldNotSetAndSet("announcer", announcer);
return (T) this; return (T) this;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public T setEmitter(ServiceEmitter emitter) public T setEmitter(ServiceEmitter emitter)
{ {
@ -237,10 +237,10 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return configFactory; return configFactory;
} }
public DruidServer getDruidServer() public DruidServerMetadata getDruidServerMetadata()
{ {
initializeDruidServer(); initializeDruidServerMetadata();
return druidServer; return druidServerMetadata;
} }
public CuratorFramework getCuratorFramework() public CuratorFramework getCuratorFramework()
@ -297,10 +297,18 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
return serverInventoryThingie; return serverInventoryThingie;
} }
private void initializeDruidServer() private void initializeDruidServerMetadata()
{ {
if (druidServer == null) { if (druidServerMetadata == null) {
setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), nodeType)); final DruidServerConfig serverConfig = getConfigFactory().build(DruidServerConfig.class);
setDruidServerMetadata(
new DruidServerMetadata(
serverConfig.getServerName(),
serverConfig.getHost(),
serverConfig.getMaxSize(),
nodeType, serverConfig.getTier()
)
);
} }
} }
@ -377,14 +385,10 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private void initializeAnnouncer() private void initializeAnnouncer()
{ {
if (announcer == null) { if (announcer == null) {
setAnnouncer( final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
new CuratorDataSegmentAnnouncer( lifecycle.addManagedInstance(announcer);
getDruidServer(),
getZkPaths(), setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()));
new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")),
getJsonMapper()
)
);
lifecycle.addManagedInstance(getAnnouncer()); lifecycle.addManagedInstance(getAnnouncer());
} }
} }
@ -436,8 +440,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
private void initializeEmitter() private void initializeEmitter()
{ {
if (emitter == null) { if (emitter == null) {
final HttpClientConfig.Builder configBuilder = HttpClientConfig.builder() final HttpClientConfig.Builder configBuilder = HttpClientConfig.builder().withNumConnections(1);
.withNumConnections(1);
final String emitterTimeoutDuration = props.getProperty("druid.emitter.timeOut"); final String emitterTimeoutDuration = props.getProperty("druid.emitter.timeOut");
if (emitterTimeoutDuration != null) { if (emitterTimeoutDuration != null) {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.DruidServerMetadata;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -36,14 +37,10 @@ public class DruidServer implements Comparable
private final Object lock = new Object(); private final Object lock = new Object();
private final String name;
private final ConcurrentMap<String, DruidDataSource> dataSources; private final ConcurrentMap<String, DruidDataSource> dataSources;
private final ConcurrentMap<String, DataSegment> segments; private final ConcurrentMap<String, DataSegment> segments;
private final String host; private final DruidServerMetadata metadata;
private final long maxSize;
private final String type;
private final String tier;
private volatile long currSize; private volatile long currSize;
@ -70,11 +67,7 @@ public class DruidServer implements Comparable
@JsonProperty("tier") String tier @JsonProperty("tier") String tier
) )
{ {
this.name = name; this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier);
this.host = host;
this.maxSize = maxSize;
this.type = type;
this.tier = tier;
this.dataSources = new ConcurrentHashMap<String, DruidDataSource>(); this.dataSources = new ConcurrentHashMap<String, DruidDataSource>();
this.segments = new ConcurrentHashMap<String, DataSegment>(); this.segments = new ConcurrentHashMap<String, DataSegment>();
@ -82,24 +75,18 @@ public class DruidServer implements Comparable
public String getName() public String getName()
{ {
return name; return metadata.getName();
} }
public Map<String, String> getStringProps() public DruidServerMetadata getMetadata()
{ {
return ImmutableMap.of( return metadata;
"name", name,
"host", host,
"maxSize", String.valueOf(maxSize),
"type", type,
"tier", tier
);
} }
@JsonProperty @JsonProperty
public String getHost() public String getHost()
{ {
return host; return metadata.getHost();
} }
@JsonProperty @JsonProperty
@ -111,19 +98,19 @@ public class DruidServer implements Comparable
@JsonProperty @JsonProperty
public long getMaxSize() public long getMaxSize()
{ {
return maxSize; return metadata.getMaxSize();
} }
@JsonProperty @JsonProperty
public String getType() public String getType()
{ {
return type; return metadata.getType();
} }
@JsonProperty @JsonProperty
public String getTier() public String getTier()
{ {
return tier; return metadata.getTier();
} }
@JsonProperty @JsonProperty
@ -175,7 +162,7 @@ public class DruidServer implements Comparable
DataSegment segment = segments.get(segmentName); DataSegment segment = segments.get(segmentName);
if (segment == null) { if (segment == null) {
log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", name, segmentName); log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentName);
return this; return this;
} }
@ -186,7 +173,7 @@ public class DruidServer implements Comparable
"Asked to remove data segment from dataSource[%s] that doesn't exist, but the segment[%s] exists!?!?!?! wtf? server[%s]", "Asked to remove data segment from dataSource[%s] that doesn't exist, but the segment[%s] exists!?!?!?! wtf? server[%s]",
segment.getDataSource(), segment.getDataSource(),
segmentName, segmentName,
name getName()
); );
return this; return this;
} }
@ -224,7 +211,7 @@ public class DruidServer implements Comparable
DruidServer that = (DruidServer) o; DruidServer that = (DruidServer) o;
if (name != null ? !name.equals(that.name) : that.name != null) { if (getName() != null ? !getName().equals(that.getName()) : that.getName() != null) {
return false; return false;
} }
@ -234,19 +221,13 @@ public class DruidServer implements Comparable
@Override @Override
public int hashCode() public int hashCode()
{ {
return name != null ? name.hashCode() : 0; return getName() != null ? getName().hashCode() : 0;
} }
@Override @Override
public String toString() public String toString()
{ {
return "DruidServer{" + return metadata.toString();
"name='" + name + '\'' +
", host='" + host + '\'' +
", maxSize=" + maxSize +
", type=" + type +
", tier=" + tier +
'}';
} }
@Override @Override
@ -259,6 +240,6 @@ public class DruidServer implements Comparable
return 1; return 1;
} }
return name.compareTo(((DruidServer) o).name); return getName().compareTo(((DruidServer) o).getName());
} }
} }

View File

@ -41,7 +41,7 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
private final Object lock = new Object(); private final Object lock = new Object();
private final DruidServer server; private final DruidServerMetadata server;
private final ZkPathsConfig config; private final ZkPathsConfig config;
private final Announcer announcer; private final Announcer announcer;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@ -50,7 +50,7 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
private volatile boolean started = false; private volatile boolean started = false;
public CuratorDataSegmentAnnouncer( public CuratorDataSegmentAnnouncer(
DruidServer server, DruidServerMetadata server,
ZkPathsConfig config, ZkPathsConfig config,
Announcer announcer, Announcer announcer,
ObjectMapper jsonMapper ObjectMapper jsonMapper

View File

@ -0,0 +1,91 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class DruidServerMetadata
{
private final String name;
private final String host;
private final long maxSize;
private final String tier;
private final String type;
@JsonCreator
public DruidServerMetadata(
@JsonProperty("name") String name,
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type, @JsonProperty("tier") String tier
)
{
this.name = name;
this.host = host;
this.maxSize = maxSize;
this.tier = tier;
this.type = type;
}
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getHost()
{
return host;
}
@JsonProperty
public long getMaxSize()
{
return maxSize;
}
@JsonProperty
public String getTier()
{
return tier;
}
@JsonProperty
public String getType()
{
return type;
}
@Override
public String toString()
{
return "DruidServer{" +
"name='" + name + '\'' +
", host='" + host + '\'' +
", maxSize=" + maxSize +
", tier='" + tier + '\'' +
", type='" + type + '\'' +
'}';
}
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.curator;
import com.netflix.curator.framework.api.CompressionProvider; import com.netflix.curator.framework.api.CompressionProvider;
import com.netflix.curator.framework.imps.GzipCompressionProvider; import com.netflix.curator.framework.imps.GzipCompressionProvider;
import java.io.IOException;
import java.util.zip.ZipException; import java.util.zip.ZipException;
/** /**
@ -48,7 +49,7 @@ public class PotentiallyGzippedCompressionProvider implements CompressionProvide
try { try {
return base.decompress(path, data); return base.decompress(path, data);
} }
catch (ZipException e) { catch (IOException e) {
return data; return data;
} }
} }

View File

@ -209,6 +209,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
} }
final String inventoryPath = String.format("%s/%s", config.getContainerPath(), containerKey); final String inventoryPath = String.format("%s/%s", config.getContainerPath(), containerKey);
log.info("Creating listener on inventory[%s]", inventoryPath);
PathChildrenCache containerCache = cacheFactory.make(curatorFramework, inventoryPath); PathChildrenCache containerCache = cacheFactory.make(curatorFramework, inventoryPath);
containerCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath)); containerCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath));

View File

@ -53,7 +53,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ZkCoordinatorConfig config; private final ZkCoordinatorConfig config;
private final DruidServer me; private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator; private final CuratorFramework curator;
private final ServerManager serverManager; private final ServerManager serverManager;
@ -68,7 +68,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
public ZkCoordinator( public ZkCoordinator(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ZkCoordinatorConfig config, ZkCoordinatorConfig config,
DruidServer me, DruidServerMetadata me,
DataSegmentAnnouncer announcer, DataSegmentAnnouncer announcer,
CuratorFramework curator, CuratorFramework curator,
ServerManager serverManager, ServerManager serverManager,

View File

@ -31,13 +31,9 @@ import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode; import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.ServerManager; import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator; import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.coordination.ZkCoordinatorConfig; import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
@ -121,7 +117,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final ZkCoordinator coordinator = new ZkCoordinator( final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(), getJsonMapper(),
getConfigFactory().build(ZkCoordinatorConfig.class), getConfigFactory().build(ZkCoordinatorConfig.class),
getDruidServer(), getDruidServerMetadata(),
getAnnouncer(), getAnnouncer(),
getCuratorFramework(), getCuratorFramework(),
serverManager, serverManager,
@ -129,7 +125,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
); );
lifecycle.addManagedInstance(coordinator); lifecycle.addManagedInstance(coordinator);
monitors.add(new ServerMonitor(getDruidServer(), serverManager)); monitors.add(new ServerMonitor(getDruidServerMetadata(), serverManager));
startMonitoring(monitors); startMonitoring(monitors);
final Context root = new Context(getServer(), "/", Context.SESSIONS); final Context root = new Context(getServer(), "/", Context.SESSIONS);

View File

@ -99,16 +99,13 @@ public class MasterMain
final ConfigurationObjectFactory configFactory = Config.createFactory(props); final ConfigurationObjectFactory configFactory = Config.createFactory(props);
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
final HttpClient httpClient = HttpClientInit.createClient( final HttpClientConfig.Builder httpClientConfigBuilder = HttpClientConfig.builder().withNumConnections(1);
HttpClientConfig.builder().withNumConnections(1).withReadTimeout(
new Duration( final String emitterTimeout = props.getProperty("druid.emitter.timeOut");
PropUtils.getProperty( if (emitterTimeout != null) {
props, httpClientConfigBuilder.withReadTimeout(new Duration(emitterTimeout));
"druid.emitter.timeOut" }
) final HttpClient httpClient = HttpClientInit.createClient(httpClientConfigBuilder.build(), lifecycle);
)
).build(), lifecycle
);
final ServiceEmitter emitter = new ServiceEmitter( final ServiceEmitter emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"), PropUtils.getProperty(props, "druid.service"),

View File

@ -22,6 +22,7 @@ package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.util.Providers;
import com.metamx.druid.client.ServerInventoryThingie; import com.metamx.druid.client.ServerInventoryThingie;
import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManager;
@ -69,7 +70,12 @@ public class MasterServletModule extends JerseyServletModule
bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager); bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager);
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager); bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
bind(DruidMaster.class).toInstance(master); bind(DruidMaster.class).toInstance(master);
bind(IndexingServiceClient.class).toInstance(indexingServiceClient); if (indexingServiceClient == null) {
bind(IndexingServiceClient.class).toProvider(Providers.<IndexingServiceClient>of(null));
}
else {
bind(IndexingServiceClient.class).toInstance(indexingServiceClient);
}
serve("/*").with(GuiceContainer.class); serve("/*").with(GuiceContainer.class);
} }

View File

@ -67,7 +67,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
if (holder.getLifetime() <= 0) { if (holder.getLifetime() <= 0) {
log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier) log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier)
.addData("segment", holder.getSegment().getIdentifier()) .addData("segment", holder.getSegment().getIdentifier())
.addData("server", holder.getFromServer().getStringProps()) .addData("server", holder.getFromServer().getMetadata())
.emit(); .emit();
} }
} }

View File

@ -20,6 +20,7 @@
package com.metamx.druid.metrics; package com.metamx.druid.metrics;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.ServerManager; import com.metamx.druid.coordination.ServerManager;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
@ -29,11 +30,11 @@ import java.util.Map;
public class ServerMonitor extends AbstractMonitor public class ServerMonitor extends AbstractMonitor
{ {
private final DruidServer druidServer; private final DruidServerMetadata druidServer;
private final ServerManager serverManager; private final ServerManager serverManager;
public ServerMonitor( public ServerMonitor(
DruidServer druidServer, DruidServerMetadata druidServer,
ServerManager serverManager ServerManager serverManager
) )
{ {

View File

@ -25,9 +25,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.CacheTestSegmentLoader; import com.metamx.druid.loading.CacheTestSegmentLoader;
@ -108,35 +105,7 @@ public class ZkCoordinatorTest
return cacheDir; return cacheDir;
} }
}, },
new DruidServer( new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal"),
new DruidServerConfig()
{
@Override
public String getServerName()
{
return "dummyServer";
}
@Override
public String getHost()
{
return "dummyHost";
}
@Override
public long getMaxSize()
{
return 0;
}
@Override
public String getTier()
{
return "normal";
}
},
"dummyType"
),
announcer, announcer,
curator, curator,
serverManager, serverManager,