Merge pull request #1622 from metamx/smile-eventreceiverfirehose

Support for JSON Smile format for EventReceiverFirehoseFactory
This commit is contained in:
Charles Allen 2015-08-27 10:42:25 -07:00
commit 10af233b37
5 changed files with 103 additions and 24 deletions

View File

@ -19,6 +19,7 @@ package io.druid.testing.clients;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.api.client.util.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
@ -44,14 +45,22 @@ public class EventReceiverFirehoseTestClient
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final String chatID;
private final ObjectMapper smileMapper;
public EventReceiverFirehoseTestClient(String host, String chatID, ObjectMapper jsonMapper, HttpClient httpClient)
public EventReceiverFirehoseTestClient(
String host,
String chatID,
ObjectMapper jsonMapper,
HttpClient httpClient,
ObjectMapper smileMapper
)
{
this.host = host;
this.jsonMapper = jsonMapper;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
this.httpClient = httpClient;
this.chatID = chatID;
this.smileMapper = smileMapper;
}
private String getURL()
@ -70,15 +79,15 @@ public class EventReceiverFirehoseTestClient
*
* @return
*/
public int postEvents(Collection<Map<String, Object>> events)
public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST, new URL(getURL())
).setContent(
MediaType.APPLICATION_JSON,
this.jsonMapper.writeValueAsBytes(events)
mediaType,
objectMapper.writeValueAsBytes(events)
),
responseHandler
).get();
@ -91,7 +100,7 @@ public class EventReceiverFirehoseTestClient
response.getContent()
);
}
Map<String, Integer> responseData = jsonMapper.readValue(
Map<String, Integer> responseData = objectMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
{
}
@ -103,18 +112,31 @@ public class EventReceiverFirehoseTestClient
}
}
/**
* Reads each events from file and post them to the indexing service.
* Uses both smileMapper and jsonMapper to send events alternately.
*
* @param file location of file to post events from
*
* @return number of events sent to the indexing service
*/
public int postEventsFromFile(String file)
{
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(
EventReceiverFirehoseTestClient.class.getResourceAsStream(
file
)
)
);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(
EventReceiverFirehoseTestClient.class.getResourceAsStream(
file
)
)
);
) {
String s;
Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>();
// Test sending events using both jsonMapper and smileMapper.
// sends events one by one using both jsonMapper and smileMapper.
int totalEventsPosted = 0;
while ((s = reader.readLine()) != null) {
events.add(
(Map<String, Object>) this.jsonMapper.readValue(
@ -123,12 +145,19 @@ public class EventReceiverFirehoseTestClient
}
)
);
ObjectMapper mapper = (totalEventsPosted % 2 == 0) ? jsonMapper : smileMapper;
String mediaType = (totalEventsPosted % 2 == 0)
? MediaType.APPLICATION_JSON
: SmileMediaTypes.APPLICATION_JACKSON_SMILE;
totalEventsPosted += postEvents(events, mapper, mediaType);
;
events = new ArrayList<>();
}
int eventsPosted = postEvents(events);
if (eventsPosted != events.size()) {
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), eventsPosted);
if (totalEventsPosted != events.size()) {
throw new ISE("All events not posted, expected : %d actual : %d", events.size(), totalEventsPosted);
}
return eventsPosted;
return totalEventsPosted;
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -19,6 +19,8 @@ package io.druid.tests.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.testing.clients.CoordinatorResourceTestClient;
import io.druid.testing.clients.OverlordResourceTestClient;
import io.druid.testing.utils.FromFileTestQueryHelper;
@ -38,8 +40,11 @@ public abstract class AbstractIndexerTest
@Inject
protected OverlordResourceTestClient indexer;
@Inject
@Json
protected ObjectMapper jsonMapper;
@Inject
@Smile
protected ObjectMapper smileMapper;
@Inject
protected FromFileTestQueryHelper queryHelper;

View File

@ -129,7 +129,8 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
host,
EVENT_RECEIVER_SERVICE_NAME,
jsonMapper,
httpClient
httpClient,
smileMapper
);
client.postEventsFromFile(EVENT_DATA_FILE);
}

View File

@ -159,7 +159,8 @@ public class ITUnionQueryTest extends AbstractIndexerTest
host,
EVENT_RECEIVER_SERVICE_PREFIX + id,
jsonMapper,
httpClient
httpClient,
smileMapper
);
client.postEventsFromFile(UNION_DATA_FILE);
}

View File

@ -20,6 +20,10 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@ -32,9 +36,15 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import java.io.InputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
@ -58,12 +68,16 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
private final String serviceName;
private final int bufferSize;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("bufferSize") Integer bufferSize,
@JacksonInject ChatHandlerProvider chatHandlerProvider
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject @Json ObjectMapper jsonMapper,
@JacksonInject @Smile ObjectMapper smileMapper
)
{
Preconditions.checkNotNull(serviceName, "serviceName");
@ -71,6 +85,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
this.serviceName = serviceName;
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
}
@Override
@ -123,9 +139,30 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
@POST
@Path("/push-events")
@Produces(MediaType.APPLICATION_JSON)
public Response addAll(Collection<Map<String, Object>> events)
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response addAll(
InputStream in,
@Context final HttpServletRequest req // used only to get request content-type
)
{
final String reqContentType = req.getContentType();
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType);
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
Collection<Map<String, Object>> events = null;
try {
events = objectMapper.readValue(
in, new TypeReference<Collection<Map<String, Object>>>()
{
}
);
}
catch (IOException e) {
return Response.serverError().entity(ImmutableMap.<String, Object>of("error", e.getMessage())).build();
}
log.debug("Adding %,d events to firehose: %s", events.size(), serviceName);
final List<InputRow> rows = Lists.newArrayList();
@ -146,12 +183,18 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRow
}
}
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
return Response.ok(
objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())),
contentType
).build();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override