Merge branch 'master' of github.com:metamx/druid into az

This commit is contained in:
fjy 2013-12-19 10:52:22 -08:00
commit b6b9bb4cd8
7 changed files with 180 additions and 27 deletions

View File

@ -62,7 +62,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
try { try {
inputRow = parser.parse(value.toString()); inputRow = parser.parse(value.toString());
} }
catch (IllegalArgumentException e) { catch (Exception e) {
if (config.isIgnoreInvalidRows()) { if (config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row return; // we're ignoring this invalid row

View File

@ -84,6 +84,7 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -101,11 +102,26 @@ public class Initialization
"io.druid", "io.druid",
"com.metamx.druid" "com.metamx.druid"
); );
private final static Map<Class, Set> extensionsMap = Maps.<Class, Set>newHashMap();
public synchronized static <T> List<T> getFromExtensions(ExtensionsConfig config, Class<T> clazz) /**
* @param clazz Module class
* @param <T>
* @return Returns the set of modules loaded.
*/
public static<T> Set<T> getLoadedModules(Class<T> clazz)
{
Set<T> retVal = extensionsMap.get(clazz);
if (retVal == null) {
return Sets.newHashSet();
}
return retVal;
}
public synchronized static <T> Collection<T> getFromExtensions(ExtensionsConfig config, Class<T> clazz)
{ {
final TeslaAether aether = getAetherClient(config); final TeslaAether aether = getAetherClient(config);
List<T> retVal = Lists.newArrayList(); Set<T> retVal = Sets.newHashSet();
if (config.searchCurrentClassloader()) { if (config.searchCurrentClassloader()) {
for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) { for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) {
@ -131,6 +147,9 @@ public class Initialization
} }
} }
// update the map with currently loaded modules
extensionsMap.put(clazz, retVal);
return retVal; return retVal;
} }

View File

@ -19,12 +19,17 @@
package io.druid.server; package io.druid.server;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/** /**
*/ */
@ -35,28 +40,20 @@ public class StatusResource
@Produces("application/json") @Produces("application/json")
public Status doGet() public Status doGet()
{ {
return getStatus(); return new Status();
}
public static Status getStatus()
{
return new Status(
Initialization.class.getPackage().getImplementationVersion(),
new Memory(Runtime.getRuntime())
);
} }
public static class Status public static class Status
{ {
final String version; final String version;
final List<ModuleVersion> modules;
final Memory memory; final Memory memory;
public Status( public Status()
String version, Memory memory
)
{ {
this.version = version; this.version = Status.class.getPackage().getImplementationVersion();
this.memory = memory; this.modules = getExtensionVersions();
this.memory = new Memory(Runtime.getRuntime());
} }
@JsonProperty @JsonProperty
@ -65,6 +62,12 @@ public class StatusResource
return version; return version;
} }
@JsonProperty
public List<ModuleVersion> getModules()
{
return modules;
}
@JsonProperty @JsonProperty
public Memory getMemory() public Memory getMemory()
{ {
@ -75,21 +78,48 @@ public class StatusResource
public String toString() public String toString()
{ {
final String NL = System.getProperty("line.separator"); final String NL = System.getProperty("line.separator");
return String.format("Druid version - %s", version) + NL; StringBuilder output = new StringBuilder();
} output.append(String.format("Druid version - %s", version)).append(NL).append(NL);
if (modules.size() > 0) {
output.append("Registered Druid Modules").append(NL);
} else {
output.append("No Druid Modules loaded !");
} }
for (ModuleVersion moduleVersion : modules) {
output.append(moduleVersion).append(NL);
}
return output.toString();
}
/**
* Load the unique extensions and return their implementation-versions
*
* @return map of extensions loaded with their respective implementation versions.
*/
private List<ModuleVersion> getExtensionVersions()
{
final Set<DruidModule> druidModules = Initialization.getLoadedModules(DruidModule.class);
List<ModuleVersion> moduleVersions = new ArrayList<>();
for (DruidModule module : druidModules) {
String artifact = module.getClass().getPackage().getImplementationTitle();
String version = module.getClass().getPackage().getImplementationVersion();
moduleVersions.add(new ModuleVersion(module.getClass().getCanonicalName(), artifact, version));
}
return moduleVersions;
}
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ModuleVersion public static class ModuleVersion
{ {
final String name; final String name;
final String artifact; final String artifact;
final String version; final String version;
public ModuleVersion(String name)
{
this(name, "", "");
}
public ModuleVersion(String name, String artifact, String version) public ModuleVersion(String name, String artifact, String version)
{ {
this.name = name; this.name = name;
@ -118,7 +148,7 @@ public class StatusResource
@Override @Override
public String toString() public String toString()
{ {
if (artifact.isEmpty()) { if (artifact == null || artifact.isEmpty()) {
return String.format(" - %s ", name); return String.format(" - %s ", name);
} else { } else {
return String.format(" - %s (%s-%s)", name, artifact, version); return String.format(" - %s (%s-%s)", name, artifact, version);
@ -164,6 +194,5 @@ public class StatusResource
{ {
return usedMemory; return usedMemory;
} }
} }
} }

View File

@ -0,0 +1,103 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import static io.druid.server.StatusResource.ModuleVersion;
/**
*/
public class StatusResourceTest
{
private Collection<DruidModule> loadTestModule()
{
Injector baseInjector = Initialization.makeStartupInjector();
return Initialization.getFromExtensions(baseInjector.getInstance(ExtensionsConfig.class), DruidModule.class);
}
@Test
public void testLoadedModules()
{
final StatusResource resource = new StatusResource();
List<ModuleVersion> statusResourceModuleList;
statusResourceModuleList = resource.doGet().getModules();
Assert.assertEquals(
"No Modules should be loaded currently! " + statusResourceModuleList,
statusResourceModuleList.size(), 0
);
Collection<DruidModule> modules = loadTestModule();
statusResourceModuleList = resource.doGet().getModules();
Assert.assertEquals("Status should have all modules loaded!", statusResourceModuleList.size(), modules.size());
for (DruidModule module : modules) {
String moduleName = module.getClass().getCanonicalName();
boolean contains = Boolean.FALSE;
for (ModuleVersion version : statusResourceModuleList) {
if (version.getName().equals(moduleName)) {
contains = Boolean.TRUE;
}
}
Assert.assertTrue("Status resource should contain module " + moduleName, contains);
}
/*
* StatusResource only uses Initialization.getLoadedModules
*/
for (int i = 0; i < 5; i++) {
Set<DruidModule> loadedModules = Initialization.getLoadedModules(DruidModule.class);
Assert.assertEquals("Set from loaded module should be same!", loadedModules, modules);
}
}
public static class TestDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
// Do nothing
}
}
}

View File

@ -0,0 +1 @@
io.druid.server.StatusResourceTest$TestDruidModule

View File

@ -30,6 +30,7 @@ import io.druid.server.initialization.ExtensionsConfig;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -75,7 +76,7 @@ public class Main
final Injector injector = Initialization.makeStartupInjector(); final Injector injector = Initialization.makeStartupInjector();
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class); final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
final List<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class); final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);
for (CliCommandCreator creator : extensionCommands) { for (CliCommandCreator creator : extensionCommands) {
creator.addCommands(builder); creator.addCommands(builder);

View File

@ -31,6 +31,6 @@ public class Version implements Runnable
@Override @Override
public void run() public void run()
{ {
System.out.println(StatusResource.getStatus()); System.out.println(new StatusResource.Status());
} }
} }