add emitting request logger

This commit is contained in:
xvrl 2013-04-24 15:38:16 -07:00
parent deefa870ec
commit e3c6bad117
4 changed files with 147 additions and 2 deletions

View File

@ -246,7 +246,20 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
{
if (requestLogger == null) {
try {
setRequestLogger(Initialization.makeRequestLogger(getJsonMapper(), getScheduledExecutorFactory(), getProps()));
final String loggingType = props.getProperty("druid.request.logging.type");
if(loggingType.equals("emitter")) {
setRequestLogger(Initialization.makeEmittingRequestLogger(
getProps(),
getEmitter()
));
}
else {
setRequestLogger(Initialization.makeFileRequestLogger(
getJsonMapper(),
getScheduledExecutorFactory(),
getProps()
));
}
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -0,0 +1,97 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.http;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import org.joda.time.DateTime;
import java.util.Map;
public class EmittingRequestLogger implements RequestLogger
{
final String service;
final String host;
final Emitter emitter;
final String feed;
public EmittingRequestLogger(String service, String host, Emitter emitter, String feed)
{
this.emitter = emitter;
this.host = host;
this.service = service;
this.feed = feed;
}
@Override
public void log(final RequestLogLine requestLogLine) throws Exception
{
emitter.emit(new RequestLogEvent(service, host, feed, requestLogLine));
}
public static class RequestLogEvent implements Event
{
final String service;
final String host;
final String feed;
final RequestLogLine request;
public RequestLogEvent(String service, String host, String feed, RequestLogLine request)
{
this.service = service;
this.host = host;
this.request = request;
this.feed = feed;
}
@Override
public Map<String, Object> toMap()
{
return ImmutableMap.<String, Object>builder()
.put("feed", getFeed())
.put("timestamp", request.getTimestamp())
.put("service", service)
.put("host", host)
.put("query", request.getQuery())
.put("remoteAddr", request.getRemoteAddr())
.build();
}
@Override
public String getFeed()
{
return feed;
}
@Override
public DateTime getCreatedTime()
{
return request.getTimestamp();
}
@Override
public boolean isSafeToBuffer()
{
return true;
}
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.http;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
@ -52,4 +53,22 @@ public class RequestLogLine
)
);
}
@JsonProperty("timestamp")
public DateTime getTimestamp()
{
return timestamp;
}
@JsonProperty("query")
public Query getQuery()
{
return query;
}
@JsonProperty("remoteAddr")
public String getRemoteAddr()
{
return remoteAddr;
}
}

View File

@ -27,11 +27,13 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.ZKPhoneBook;
import com.metamx.druid.http.EmittingRequestLogger;
import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.utils.PropUtils;
import com.metamx.druid.zk.PropertiesZkSerializer;
import com.metamx.druid.zk.StringZkSerializer;
import com.metamx.emitter.core.Emitter;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.ExponentialBackoffRetry;
@ -348,7 +350,11 @@ public class Initialization
return serviceProvider;
}
public static RequestLogger makeRequestLogger(ObjectMapper objectMapper, ScheduledExecutorFactory factory, Properties props) throws IOException
public static RequestLogger makeFileRequestLogger(
ObjectMapper objectMapper,
ScheduledExecutorFactory factory,
Properties props
) throws IOException
{
return new FileRequestLogger(
objectMapper,
@ -357,6 +363,16 @@ public class Initialization
);
}
public static RequestLogger makeEmittingRequestLogger(Properties props, Emitter emitter)
{
return new EmittingRequestLogger(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
emitter,
PropUtils.getProperty(props, "druid.request.logging.feed")
);
}
public static String makePropPath(String basePath)
{
return String.format("%s/%s", basePath, PROP_SUBPATH);