Merge pull request #777 from metamx/initialized-endpoint

Historical endpoints accessible while loading + new loadstatus endpoint
This commit is contained in:
fjy 2014-10-06 17:35:58 -06:00
commit d4217f1588
11 changed files with 48 additions and 24 deletions

View File

@ -1019,8 +1019,6 @@ public class ApproximateHistogram
* @param count current size of the heap * @param count current size of the heap
* @param heapIndex index of the item to be deleted * @param heapIndex index of the item to be deleted
* @param values values stored in the heap * @param values values stored in the heap
*
* @return
*/ */
private static int heapDelete(int[] heap, int[] reverseIndex, int count, int heapIndex, float[] values) private static int heapDelete(int[] heap, int[] reverseIndex, int count, int heapIndex, float[] values)
{ {

View File

@ -200,9 +200,6 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
/** /**
* Checks if the payload for the given ByteBuffer is sparse or not. * Checks if the payload for the given ByteBuffer is sparse or not.
* The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse * The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse
*
* @param buffer
* @return
*/ */
private static boolean isSparse(ByteBuffer buffer) private static boolean isSparse(ByteBuffer buffer)
{ {
@ -636,8 +633,6 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
* @param position The position into the byte buffer, this position represents two "registers" * @param position The position into the byte buffer, this position represents two "registers"
* @param offsetDiff The difference in offset between the byteToAdd and the current HyperLogLogCollector * @param offsetDiff The difference in offset between the byteToAdd and the current HyperLogLogCollector
* @param byteToAdd The byte to merge into the current HyperLogLogCollector * @param byteToAdd The byte to merge into the current HyperLogLogCollector
*
* @return
*/ */
private static int mergeAndStoreByteRegister( private static int mergeAndStoreByteRegister(
final ByteBuffer storageBuffer, final ByteBuffer storageBuffer,

View File

@ -49,8 +49,6 @@ public interface HavingSpec
* @param row A Row of data that may contain aggregated values * @param row A Row of data that may contain aggregated values
* *
* @return true if the given row satisfies the having spec. False otherwise. * @return true if the given row satisfies the having spec. False otherwise.
*
* @see Row
*/ */
public boolean eval(Row row); public boolean eval(Row row);

View File

@ -173,8 +173,6 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
/** /**
* For testing. Do not depend on unless you like things breaking. * For testing. Do not depend on unless you like things breaking.
*
* @return
*/ */
GenericIndexed<ResourceHolder<FloatBuffer>> getBaseFloatBuffers() GenericIndexed<ResourceHolder<FloatBuffer>> getBaseFloatBuffers()
{ {

View File

@ -184,7 +184,6 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
/** /**
* For testing. Do not use unless you like things breaking * For testing. Do not use unless you like things breaking
* @return
*/ */
GenericIndexed<ResourceHolder<LongBuffer>> getBaseLongBuffers() GenericIndexed<ResourceHolder<LongBuffer>> getBaseLongBuffers()
{ {

View File

@ -673,10 +673,9 @@ public class IncrementalIndex implements Iterable<Row>
falseIdsReverse = biMap.inverse(); falseIdsReverse = biMap.inverse();
} }
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` // Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) // see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String value) public String get(String value)
{ {
return value == null ? null : poorMansInterning.get(value); return value == null ? null : poorMansInterning.get(value);

View File

@ -532,10 +532,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
} }
for (String dimVal : dims[dimIndex]) { for (String dimVal : dims[dimIndex]) {
/** // using == here instead of .equals() to speed up lookups made possible by
* using == here instead of .equals() to speed up lookups made possible by // io.druid.segment.incremental.IncrementalIndex.DimDim#poorMansInterning
* {@link io.druid.segment.incremental.IncrementalIndex.DimDim#poorMansInterning}
*/
if (id == dimVal) { if (id == dimVal) {
return true; return true;
} }

View File

@ -37,7 +37,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
/** /**
@ -212,6 +211,11 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
} }
} }
public boolean isStarted()
{
return started;
}
public abstract void loadLocalCache(); public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler(); public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();

View File

@ -169,7 +169,7 @@ public class ZkCoordinator extends BaseZkCoordinator
catch (IOException e) { catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
} }
}; }
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource") log.makeAlert(e, "Failed to load segment for dataSource")

View File

@ -0,0 +1,32 @@
package io.druid.server.http;
import com.google.common.collect.ImmutableMap;
import io.druid.server.coordination.ZkCoordinator;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
@Path("/druid/historical/v1")
public class HistoricalResource
{
private final ZkCoordinator coordinator;
@Inject
public HistoricalResource(
ZkCoordinator coordinator
)
{
this.coordinator = coordinator;
}
@GET
@Path("/loadstatus")
@Produces("application/json")
public Response getLoadStatus()
{
return Response.ok(ImmutableMap.of("cacheInitialized", coordinator.isStarted())).build();
}
}

View File

@ -38,6 +38,7 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ServerManager;
import io.druid.server.coordination.ZkCoordinator; import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.http.HistoricalResource;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule; import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -68,6 +69,8 @@ public class CliHistorical extends ServerRunnable
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
// register Server before binding ZkCoordinator to ensure HTTP endpoints are available immediately
LifecycleModule.register(binder, Server.class);
binder.bind(ServerManager.class).in(LazySingleton.class); binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class); binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
@ -75,10 +78,10 @@ public class CliHistorical extends ServerRunnable
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical")); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, HistoricalResource.class);
LifecycleModule.register(binder, ZkCoordinator.class); LifecycleModule.register(binder, ZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class); JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class);