reverting some of the last changes

This commit is contained in:
Fangjin Yang 2012-11-12 16:14:48 -08:00
parent c20dccd0f4
commit 57468d39ef
8 changed files with 157 additions and 27 deletions

View File

@ -0,0 +1,26 @@
package com.metamx.druid;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties;
/**
*/
@Deprecated
public abstract class BaseNode<T extends BaseNode> extends QueryableNode
{
protected BaseNode(
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}

View File

@ -173,18 +173,11 @@ public abstract class QueryableNode<T extends QueryableNode> implements Register
return (T) this; return (T) this;
} }
@SuppressWarnings("unchecked")
public T registerHandler(Registererer registererer)
{
registererer.registerSerde();
return (T) this;
}
@Override @Override
public void registerHandlers(Registererer... registererers) public void registerHandlers(Registererer... registererers)
{ {
for (Registererer registererer : registererers) { for (Registererer registererer : registererers) {
registererer.registerSerde(); registererer.register();
registererer.registerSubType(jsonMapper); registererer.registerSubType(jsonMapper);
registererer.registerSubType(smileMapper); registererer.registerSubType(smileMapper);
} }

View File

@ -31,7 +31,7 @@ import org.codehaus.jackson.map.ObjectMapper;
*/ */
public interface Registererer public interface Registererer
{ {
public void registerSerde(); public void register();
public void registerSubType(ObjectMapper jsonMapper); public void registerSubType(ObjectMapper jsonMapper);
} }

View File

@ -105,7 +105,7 @@ public class HadoopDruidIndexerConfig
} }
); );
for (Registererer registererer : registererers) { for (Registererer registererer : registererers) {
registererer.registerSerde(); registererer.register();
} }
} }

View File

@ -6,7 +6,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.RegisteringNode; import com.metamx.druid.RegisteringNode;
import com.metamx.druid.index.v1.serde.Registererer; import com.metamx.druid.index.v1.serde.Registererer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.jsontype.NamedType; import org.codehaus.jackson.map.jsontype.NamedType;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -66,7 +65,7 @@ public class HadoopDruidIndexerNode implements RegisteringNode
public void registerHandlers(Registererer... registererers) public void registerHandlers(Registererer... registererers)
{ {
for (Registererer registererer : registererers) { for (Registererer registererer : registererers) {
registererer.registerSerde(); registererer.register();
registererer.registerSubType(HadoopDruidIndexerConfig.jsonMapper); registererer.registerSubType(HadoopDruidIndexerConfig.jsonMapper);
} }
} }

View File

@ -185,18 +185,12 @@ public class IndexerCoordinatorNode implements RegisteringNode
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory = taskRunnerFactory;
} }
public IndexerCoordinatorNode registerHandler(Registererer registererer)
{
registererer.registerSerde();
return this;
}
@Override @Override
public void registerHandlers(Registererer... registererers) public void registerHandlers(Registererer... registererers)
{ {
for (Registererer registererer : registererers) { for (Registererer registererer : registererers) {
registererer.registerSerde(); registererer.register();
registererer.registerSubType(this.jsonMapper); registererer.registerSubType(jsonMapper);
} }
} }

View File

@ -149,17 +149,11 @@ public class WorkerNode implements RegisteringNode
return this; return this;
} }
public WorkerNode registerHandler(Registererer registererer)
{
registererer.registerSerde();
return this;
}
@Override @Override
public void registerHandlers(Registererer... registererers) public void registerHandlers(Registererer... registererers)
{ {
for (Registererer registererer : registererers) { for (Registererer registererer : registererers) {
registererer.registerSerde(); registererer.register();
registererer.registerSubType(jsonMapper); registererer.registerSubType(jsonMapper);
} }
} }

View File

@ -0,0 +1,124 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.skife.config.ConfigurationObjectFactory;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
/**
*/
public abstract class BaseServerNode<T extends QueryableNode> extends QueryableNode<T>
{
private final Map<Class<? extends Query>, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap();
private QueryRunnerFactoryConglomerate conglomerate = null;
private StupidPool<ByteBuffer> computeScratchPool = null;
public BaseServerNode(
Logger log,
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
}
public QueryRunnerFactoryConglomerate getConglomerate()
{
initializeQueryRunnerFactoryConglomerate();
return conglomerate;
}
public StupidPool<ByteBuffer> getComputeScratchPool()
{
initializeComputeScratchPool();
return computeScratchPool;
}
@SuppressWarnings("unchecked")
public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
checkFieldNotSetAndSet("conglomerate", conglomerate);
return (T) this;
}
@SuppressWarnings("unchecked")
public T setComputeScratchPool(StupidPool<ByteBuffer> computeScratchPool)
{
checkFieldNotSetAndSet("computeScratchPool", computeScratchPool);
return (T) this;
}
@SuppressWarnings("unchecked")
public T registerQueryRunnerFactory(Class<? extends Query> queryClazz, QueryRunnerFactory factory)
{
Preconditions.checkState(
conglomerate == null,
"Registering a QueryRunnerFactory only works when a separate conglomerate is not specified."
);
Preconditions.checkState(
!additionalFactories.containsKey(queryClazz), "Registered factory for class[%s] multiple times", queryClazz
);
additionalFactories.put(queryClazz, factory);
return (T) this;
}
private void initializeComputeScratchPool()
{
if (computeScratchPool == null) {
setComputeScratchPool(
ServerInit.makeComputeScratchPool(
PropUtils.getPropertyAsInt(getProps(), "druid.computation.buffer.size", 1024 * 1024 * 1024)
)
);
}
}
private void initializeQueryRunnerFactoryConglomerate()
{
if (conglomerate == null) {
final Map<Class<? extends Query>, QueryRunnerFactory> factories = ServerInit.initDefaultQueryTypes(
getConfigFactory(), getComputeScratchPool()
);
for (Map.Entry<Class<? extends Query>, QueryRunnerFactory> entry : additionalFactories.entrySet()) {
factories.put(entry.getKey(), entry.getValue());
}
setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories));
}
}
}