Support for JSON Smile format for EventReceiverFirehoseFactory

Support for Smile format for EventReceiverFireshoseFactory.
would help in decreasing network load on middle managers ingesting high
data volumes.

review comments
This commit is contained in:
Nishant 2015-08-13 20:02:57 +05:30
parent 1ecec1da5a
commit e6b20db62d
5 changed files with 103 additions and 24 deletions

View File

@ -19,6 +19,7 @@ package io.druid.testing.clients;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.api.client.util.Charsets; import com.google.api.client.util.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.metamx.common.ISE; import com.metamx.common.ISE;
@ -44,14 +45,22 @@ public class EventReceiverFirehoseTestClient
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final HttpClient httpClient; private final HttpClient httpClient;
private final String chatID; private final String chatID;
private final ObjectMapper smileMapper;
public EventReceiverFirehoseTestClient(String host, String chatID, ObjectMapper jsonMapper, HttpClient httpClient) public EventReceiverFirehoseTestClient(
String host,
String chatID,
ObjectMapper jsonMapper,
HttpClient httpClient,
ObjectMapper smileMapper
)
{ {
this.host = host; this.host = host;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
this.httpClient = httpClient; this.httpClient = httpClient;
this.chatID = chatID; this.chatID = chatID;
this.smileMapper = smileMapper;
} }
private String getURL() private String getURL()
@ -70,15 +79,15 @@ public class EventReceiverFirehoseTestClient
* *
* @return * @return
*/ */
public int postEvents(Collection<Map<String, Object>> events) public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType)
{ {
try { try {
StatusResponseHolder response = httpClient.go( StatusResponseHolder response = httpClient.go(
new Request( new Request(
HttpMethod.POST, new URL(getURL()) HttpMethod.POST, new URL(getURL())
).setContent( ).setContent(
MediaType.APPLICATION_JSON, mediaType,
this.jsonMapper.writeValueAsBytes(events) objectMapper.writeValueAsBytes(events)
), ),
responseHandler responseHandler
).get(); ).get();
@ -91,7 +100,7 @@ public class EventReceiverFirehoseTestClient
response.getContent() response.getContent()
); );
} }
Map<String, Integer> responseData = jsonMapper.readValue( Map<String, Integer> responseData = objectMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>() response.getContent(), new TypeReference<Map<String, Integer>>()
{ {
} }
@ -103,9 +112,17 @@ public class EventReceiverFirehoseTestClient
} }
} }
/**
* Reads each events from file and post them to the indexing service.
* Uses both smileMapper and jsonMapper to send events alternately.
*
* @param file location of file to post events from
*
* @return number of events sent to the indexing service
*/
public int postEventsFromFile(String file) public int postEventsFromFile(String file)
{ {
try { try (
BufferedReader reader = new BufferedReader( BufferedReader reader = new BufferedReader(
new InputStreamReader( new InputStreamReader(
EventReceiverFirehoseTestClient.class.getResourceAsStream( EventReceiverFirehoseTestClient.class.getResourceAsStream(
@ -113,8 +130,13 @@ public class EventReceiverFirehoseTestClient
) )
) )
); );
) {
String s; String s;
Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>(); Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>();
// Test sending events using both jsonMapper and smileMapper.
// sends events one by one using both jsonMapper and smileMapper.
int totalEventsPosted = 0;
while ((s = reader.readLine()) != null) { while ((s = reader.readLine()) != null) {
events.add( events.add(
(Map<String, Object>) this.jsonMapper.readValue( (Map<String, Object>) this.jsonMapper.readValue(
@ -123,12 +145,19 @@ public class EventReceiverFirehoseTestClient
} }
) )
); );
ObjectMapper mapper = (totalEventsPosted % 2 == 0) ? jsonMapper : smileMapper;
String mediaType = (totalEventsPosted % 2 == 0)
? MediaType.APPLICATION_JSON
: SmileMediaTypes.APPLICATION_JACKSON_SMILE;
totalEventsPosted += postEvents(events, mapper, mediaType);
;
events = new ArrayList<>();
} }
int eventsPosted = postEvents(events);
if (eventsPosted != events.size()) { if (totalEventsPosted != events.size()) {
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), eventsPosted); throw new ISE("All events not posted, expected : %d actual : %d", events.size(), totalEventsPosted);
} }
return eventsPosted; return totalEventsPosted;
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -19,6 +19,8 @@ package io.druid.tests.indexer;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.testing.clients.CoordinatorResourceTestClient; import io.druid.testing.clients.CoordinatorResourceTestClient;
import io.druid.testing.clients.OverlordResourceTestClient; import io.druid.testing.clients.OverlordResourceTestClient;
import io.druid.testing.utils.FromFileTestQueryHelper; import io.druid.testing.utils.FromFileTestQueryHelper;
@ -38,8 +40,11 @@ public abstract class AbstractIndexerTest
@Inject @Inject
protected OverlordResourceTestClient indexer; protected OverlordResourceTestClient indexer;
@Inject @Inject
@Json
protected ObjectMapper jsonMapper; protected ObjectMapper jsonMapper;
@Inject
@Smile
protected ObjectMapper smileMapper;
@Inject @Inject
protected FromFileTestQueryHelper queryHelper; protected FromFileTestQueryHelper queryHelper;

View File

@ -129,7 +129,8 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
host, host,
EVENT_RECEIVER_SERVICE_NAME, EVENT_RECEIVER_SERVICE_NAME,
jsonMapper, jsonMapper,
httpClient httpClient,
smileMapper
); );
client.postEventsFromFile(EVENT_DATA_FILE); client.postEventsFromFile(EVENT_DATA_FILE);
} }

View File

@ -159,7 +159,8 @@ public class ITUnionQueryTest extends AbstractIndexerTest
host, host,
EVENT_RECEIVER_SERVICE_PREFIX + id, EVENT_RECEIVER_SERVICE_PREFIX + id,
jsonMapper, jsonMapper,
httpClient httpClient,
smileMapper
); );
client.postEventsFromFile(UNION_DATA_FILE); client.postEventsFromFile(UNION_DATA_FILE);
} }

View File

@ -20,6 +20,10 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -32,9 +36,15 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.MapInputRowParser;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import java.io.InputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
@ -58,12 +68,16 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private final String serviceName; private final String serviceName;
private final int bufferSize; private final int bufferSize;
private final Optional<ChatHandlerProvider> chatHandlerProvider; private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
@JsonCreator @JsonCreator
public EventReceiverFirehoseFactory( public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName, @JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("bufferSize") Integer bufferSize,
@JacksonInject ChatHandlerProvider chatHandlerProvider @JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper
) )
{ {
Preconditions.checkNotNull(serviceName, "serviceName"); Preconditions.checkNotNull(serviceName, "serviceName");
@ -71,6 +85,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
this.serviceName = serviceName; this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
} }
@Override @Override
@ -123,9 +139,30 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
@POST @POST
@Path("/push-events") @Path("/push-events")
@Produces(MediaType.APPLICATION_JSON) @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(Collection<Map<String, Object>> events) @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
InputStream in,
@Context final HttpServletRequest req // used only to get request content-type
)
{ {
final String reqContentType = req.getContentType();
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
Collection<Map<String, Object>> events = null;
try {
events = objectMapper.readValue(
in, new TypeReference<Collection<Map<String, Object>>>()
{
}
);
}
catch (IOException e) {
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
}
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName); log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList(); final List<InputRow> rows = Lists.newArrayList();
@ -146,12 +183,18 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
} }
} }
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); return Response.ok(
objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
contentType
).build();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
} }
@Override @Override