Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-12-15 09:42:08 -08:00
commit 56dfe5d883
9 changed files with 259 additions and 74 deletions

View File

@ -15,7 +15,7 @@ The coordinator module uses several of the default modules in [Configuration](Co
|`druid.coordinator.period`|The run period for the coordinator. The coordinators operates by maintaining the current state of the world in memory and periodically looking at the set of segments available and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|PT60S| |`druid.coordinator.period`|The run period for the coordinator. The coordinators operates by maintaining the current state of the world in memory and periodically looking at the set of segments available and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|PT60S|
|`druid.coordinator.period.indexingPeriod`|How often to send indexing tasks to the indexing service. Only applies if merge or conversion is turned on.|PT1800S (30 mins)| |`druid.coordinator.period.indexingPeriod`|How often to send indexing tasks to the indexing service. Only applies if merge or conversion is turned on.|PT1800S (30 mins)|
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesnt allow the Coordinator to know for a fact that its done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S| |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesnt allow the Coordinator to know for a fact that its done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S|
|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|PT300S| |`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|false|
|`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false| |`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false|
|`druid.coordinator.load.timeout`|The timeout duration for when the coordinator assigns a segment to a historical node.|15 minutes| |`druid.coordinator.load.timeout`|The timeout duration for when the coordinator assigns a segment to a historical node.|15 minutes|
|`druid.manager.segment.pollDuration`|The duration between polls the Coordinator does for updates to the set of active segments. Generally defines the amount of lag time it can take for the coordinator to notice new segments.|PT1M| |`druid.manager.segment.pollDuration`|The duration between polls the Coordinator does for updates to the set of active segments. Generally defines the amount of lag time it can take for the coordinator to notice new segments.|PT1M|

View File

@ -210,22 +210,22 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
}; };
private long rowCount = 0L; private long rowCount = 0L;
private boolean waitIfmax = (maxEventCount < 0L); private boolean waitIfmax = (getMaxEventCount() < 0L);
private final Map<String, Object> theMap = new HashMap<String, Object>(2); private final Map<String, Object> theMap = new HashMap<String, Object>(2);
// DIY json parsing // private final ObjectMapper omapper = new ObjectMapper(); // DIY json parsing // private final ObjectMapper omapper = new ObjectMapper();
private boolean maxTimeReached() private boolean maxTimeReached()
{ {
if (maxRunMinutes <= 0) { if (getMaxRunMinutes() <= 0) {
return false; return false;
} else { } else {
return (System.currentTimeMillis() - startMsec) / 60000L >= maxRunMinutes; return (System.currentTimeMillis() - startMsec) / 60000L >= getMaxRunMinutes();
} }
} }
private boolean maxCountReached() private boolean maxCountReached()
{ {
return maxEventCount >= 0 && rowCount >= maxEventCount; return getMaxEventCount() >= 0 && rowCount >= getMaxEventCount();
} }
@Override @Override
@ -311,4 +311,15 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory<InputRowP
}; };
} }
@JsonProperty
public int getMaxEventCount()
{
return maxEventCount;
}
@JsonProperty
public int getMaxRunMinutes()
{
return maxRunMinutes;
}
} }

View File

@ -412,9 +412,7 @@ public class OverlordResource
try { try {
final Optional<ByteSource> stream = taskLogStreamer.streamTaskLog(taskid, offset); final Optional<ByteSource> stream = taskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) { if (stream.isPresent()) {
try(InputStream istream = stream.get().openStream()) { return Response.ok(stream.get().openStream()).build();
return Response.ok(istream).build();
}
} else { } else {
return Response.status(Response.Status.NOT_FOUND) return Response.status(Response.Status.NOT_FOUND)
.entity( .entity(

View File

@ -20,6 +20,7 @@
package io.druid.indexing.common.task; package io.druid.indexing.common.task;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
@ -57,7 +58,10 @@ public class IndexTaskTest
@Test @Test
public void testDeterminePartitions() throws Exception public void testDeterminePartitions() throws Exception
{ {
File tmpFile = File.createTempFile("druid", "index"); File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
tmpFile.deleteOnExit(); tmpFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tmpFile); PrintWriter writer = new PrintWriter(tmpFile);
@ -97,7 +101,7 @@ public class IndexTaskTest
), ),
new IndexTask.IndexIOConfig( new IndexTask.IndexIOConfig(
new LocalFirehoseFactory( new LocalFirehoseFactory(
tmpFile.getParentFile(), tmpDir,
"druid*", "druid*",
null null
) )

View File

@ -66,6 +66,8 @@ public class BrokerServerView implements TimelineServerView
private final ServerInventoryView baseView; private final ServerInventoryView baseView;
private final TierSelectorStrategy tierSelectorStrategy; private final TierSelectorStrategy tierSelectorStrategy;
private volatile boolean initialized = false;
@Inject @Inject
public BrokerServerView( public BrokerServerView(
QueryToolChestWarehouse warehouse, QueryToolChestWarehouse warehouse,
@ -109,6 +111,7 @@ public class BrokerServerView implements TimelineServerView
@Override @Override
public CallbackAction segmentViewInitialized() public CallbackAction segmentViewInitialized()
{ {
initialized = true;
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
} }
} }
@ -128,6 +131,11 @@ public class BrokerServerView implements TimelineServerView
); );
} }
public boolean isInitialized()
{
return initialized;
}
public void clear() public void clear()
{ {
synchronized (lock) { synchronized (lock) {

View File

@ -1,6 +1,6 @@
/* /*
* Druid - a distributed column store. * Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc. * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -31,6 +31,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -43,9 +44,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
@ -57,6 +57,8 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher; import io.druid.query.QueryWatcher;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
@ -65,13 +67,19 @@ import javax.ws.rs.core.MediaType;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL; import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
@ -143,23 +151,19 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try { try {
log.debug("Querying url[%s]", url); log.debug("Querying url[%s]", url);
future = httpClient
.post(new URL(url)) final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON)
.go(
new InputStreamResponseHandler()
{ {
long startTime; private long startTime;
long byteCount = 0; private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
@Override @Override
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response) public ClientResponse<InputStream> handleResponse(HttpResponse response)
{ {
log.debug("Initial response from url[%s]", url); log.debug("Initial response from url[%s]", url);
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
byteCount += response.getContent().readableBytes();
try { try {
final String responseContext = response.headers().get("X-Druid-Response-Context"); final String responseContext = response.headers().get("X-Druid-Response-Context");
// context may be null in case of error or query timeout // context may be null in case of error or query timeout
@ -172,39 +176,143 @@ public class DirectDruidClient<T> implements QueryRunner<T>
) )
); );
} }
queue.put(new ChannelBufferInputStream(response.getContent()));
} }
catch (IOException e) { catch (final IOException e) {
log.error(e, "Unable to parse response context from url[%s]", url); log.error(e, "Error parsing response context from url [%s]", url);
return ClientResponse.<InputStream>finished(
new InputStream()
{
@Override
public int read() throws IOException
{
throw e;
}
}
);
}
catch (InterruptedException e) {
log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.<InputStream>finished(
new SequenceInputStream(
new Enumeration<InputStream>()
{
@Override
public boolean hasMoreElements()
{
// Done is always true until the last stream has be put in the queue.
// Then the stream should be spouting good InputStreams.
synchronized (done) {
return !done.get() || !queue.isEmpty();
} }
return super.handleResponse(response);
} }
@Override @Override
public ClientResponse<AppendableByteArrayInputStream> handleChunk( public InputStream nextElement()
ClientResponse<AppendableByteArrayInputStream> clientResponse, HttpChunk chunk {
synchronized (done) {
try {
// Ensures more elements are expected via `done`
return queue.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
}
}
)
);
}
@Override
public ClientResponse<InputStream> handleChunk(
ClientResponse<InputStream> clientResponse, HttpChunk chunk
) )
{ {
final int bytes = chunk.getContent().readableBytes(); final ChannelBuffer channelBuffer = chunk.getContent();
byteCount += bytes; final int bytes = channelBuffer.readableBytes();
return super.handleChunk(clientResponse, chunk); if (bytes > 0) {
try {
queue.put(new ChannelBufferInputStream(channelBuffer));
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(bytes);
}
return clientResponse;
} }
@Override @Override
public ClientResponse<InputStream> done(ClientResponse<AppendableByteArrayInputStream> clientResponse) public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
{ {
long stopTime = System.currentTimeMillis(); long stopTime = System.currentTimeMillis();
log.debug( log.debug(
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", "Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
url, url,
byteCount, byteCount.get(),
stopTime - startTime, stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime)) byteCount.get() / (0.0001 * (stopTime - startTime))
); );
return super.done(clientResponse); synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
// after done is set to true, regardless of the rest of the stream's state.
queue.put(ByteSource.empty().openStream());
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (IOException e) {
// This should never happen
throw Throwables.propagate(e);
}
finally {
done.set(true);
}
}
return ClientResponse.<InputStream>finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e)
{
// Don't wait for lock in case the lock had something to do with the error
synchronized (done) {
done.set(true);
// Make a best effort to put a zero length buffer into the queue in case something is waiting on the take()
// If nothing is waiting on take(), this will be closed out anyways.
queue.offer(
new InputStream()
{
@Override
public int read() throws IOException
{
throw new IOException(e);
} }
} }
); );
}
}
};
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
)
.go(responseHandler);
queryWatcher.registerQuery(query, future); queryWatcher.registerQuery(query, future);
@ -228,7 +336,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
StatusResponseHolder res = httpClient StatusResponseHolder res = httpClient
.delete(new URL(cancelUrl)) .delete(new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query)) .setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON) .setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
)
.go(new StatusResponseHandler(Charsets.UTF_8)) .go(new StatusResponseHandler(Charsets.UTF_8))
.get(); .get();
if (res.getStatus().getCode() >= 500) { if (res.getStatus().getCode() >= 500) {

View File

@ -54,7 +54,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
private final CuratorFramework curator; private final CuratorFramework curator;
private volatile PathChildrenCache loadQueueCache; private volatile PathChildrenCache loadQueueCache;
private volatile boolean started; private volatile boolean started = false;
private final ListeningExecutorService loadingExec; private final ListeningExecutorService loadingExec;
public BaseZkCoordinator( public BaseZkCoordinator(

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.http;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.client.BrokerServerView;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/druid/broker/v1")
public class BrokerResource
{
private final BrokerServerView brokerServerView;
@Inject
public BrokerResource(BrokerServerView brokerServerView)
{
this.brokerServerView = brokerServerView;
}
@GET
@Path("/loadstatus")
@Produces(MediaType.APPLICATION_JSON)
public Response getLoadStatus()
{
return Response.ok(ImmutableMap.of("inventoryInitialized", brokerServerView.isInitialized())).build();
}
}

View File

@ -48,6 +48,7 @@ import io.druid.server.ClientInfoResource;
import io.druid.server.ClientQuerySegmentWalker; import io.druid.server.ClientQuerySegmentWalker;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.coordination.broker.DruidBroker;
import io.druid.server.http.BrokerResource;
import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule; import io.druid.server.metrics.MetricsModule;
import io.druid.server.router.TieredBrokerConfig; import io.druid.server.router.TieredBrokerConfig;
@ -87,6 +88,7 @@ public class CliBroker extends ServerRunnable
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(CachingClusteredClient.class).in(LazySingleton.class); binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(BrokerServerView.class).in(LazySingleton.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
@ -101,6 +103,7 @@ public class CliBroker extends ServerRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, BrokerResource.class);
Jerseys.addResource(binder, ClientInfoResource.class); Jerseys.addResource(binder, ClientInfoResource.class);
LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, DruidBroker.class); LifecycleModule.register(binder, DruidBroker.class);