YARN-4179. [reader implementation] support flow activity queries based on time (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2015-10-22 17:41:40 -07:00
parent d014f2ffd2
commit e3e857866d
5 changed files with 232 additions and 9 deletions

View File

@ -140,7 +140,15 @@ public void setCluster(String cluster) {
}
public Date getDate() {
return (Date)getInfo().get(DATE_INFO_KEY);
Object date = getInfo().get(DATE_INFO_KEY);
if (date != null) {
if (date instanceof Long) {
return new Date((Long)date);
} else if (date instanceof Date) {
return (Date)date;
}
}
return null;
}
public void setDate(long time) {

View File

@ -19,12 +19,18 @@
package org.apache.hadoop.yarn.server.timelineservice.reader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@ -54,6 +60,7 @@
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
/** REST end point for Timeline Reader */
@ -70,11 +77,96 @@ public class TimelineReaderWebServices {
private static final String COMMA_DELIMITER = ",";
private static final String COLON_DELIMITER = ":";
private static final String QUERY_STRING_SEP = "?";
private static final String RANGE_DELIMITER = "-";
private static final String DATE_PATTERN = "yyyyMMdd";
@VisibleForTesting
static ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>() {
@Override
protected DateFormat initialValue() {
SimpleDateFormat format =
new SimpleDateFormat(DATE_PATTERN, Locale.ENGLISH);
format.setTimeZone(TimeZone.getTimeZone("GMT"));
format.setLenient(false);
return format;
}
};
private void init(HttpServletResponse response) {
response.setContentType(null);
}
private static class DateRange {
Long dateStart;
Long dateEnd;
private DateRange(Long start, Long end) {
this.dateStart = start;
this.dateEnd = end;
}
}
private static long parseDate(String strDate) throws ParseException {
Date date = DATE_FORMAT.get().parse(strDate);
return date.getTime();
}
/**
* Parses date range which can be a single date or in the format
* "[startdate]-[enddate]" where either of start or end date may not exist.
* @param dateRange
* @return a {@link DateRange} object.
* @throws IllegalArgumentException
*/
private static DateRange parseDateRange(String dateRange)
throws IllegalArgumentException {
if (dateRange == null || dateRange.isEmpty()) {
return new DateRange(null, null);
}
// Split date range around "-" fetching two components indicating start and
// end date.
String[] dates = dateRange.split(RANGE_DELIMITER, 2);
Long start = null;
Long end = null;
try {
String startDate = dates[0].trim();
if (!startDate.isEmpty()) {
// Start date is not in yyyyMMdd format.
if (startDate.length() != DATE_PATTERN.length()) {
throw new IllegalArgumentException("Invalid date range " + dateRange);
}
// Parse start date which exists before "-" in date range.
// If "-" does not exist in date range, this effectively
// gives single date.
start = parseDate(startDate);
}
if (dates.length > 1) {
String endDate = dates[1].trim();
if (!endDate.isEmpty()) {
// End date is not in yyyyMMdd format.
if (endDate.length() != DATE_PATTERN.length()) {
throw new IllegalArgumentException(
"Invalid date range " + dateRange);
}
// Parse end date which exists after "-" in date range.
end = parseDate(endDate);
}
} else {
// Its a single date(without "-" in date range), so set
// end equal to start.
end = start;
}
if (start != null && end != null) {
if (start > end) {
throw new IllegalArgumentException("Invalid date range " + dateRange);
}
}
return new DateRange(start, end);
} catch (ParseException e) {
// Date could not be parsed.
throw new IllegalArgumentException("Invalid date range " + dateRange);
}
}
private static Set<String> parseValuesStr(String str, String delimiter) {
if (str == null || str.isEmpty()) {
return null;
@ -205,7 +297,8 @@ private static void handleException(Exception e, String url, long startTime,
if (e instanceof NumberFormatException) {
throw new BadRequestException(invalidNumMsg + " is not a numeric value.");
} else if (e instanceof IllegalArgumentException) {
throw new BadRequestException("Requested Invalid Field.");
throw new BadRequestException(e.getMessage() == null ?
"Requested Invalid Field." : e.getMessage());
} else {
LOG.error("Error while processing REST request", e);
throw new WebApplicationException(e,
@ -514,8 +607,20 @@ public Set<TimelineEntity> getFlowRuns(
}
/**
* Return a list of flows for a given cluster id. Cluster ID is not
* provided by client so default cluster ID has to be taken.
* Return a list of flows. Cluster ID is not provided by client so default
* cluster ID has to be taken. daterange, if specified is given as
* "[startdate]-[enddate]"(i.e. start and end date separated by -) or
* single date. Dates are interpreted in yyyyMMdd format and are assumed to
* be in GMT. If a single date is specified, all flows active on that date are
* returned. If both startdate and enddate is given, all flows active between
* start and end date will be returned. If only startdate is given, flows
* active on and after startdate are returned. If only enddate is given, flows
* active on and before enddate are returned.
* For example :
* "daterange=20150711" returns flows active on 20150711.
* "daterange=20150711-20150714" returns flows active between these 2 dates.
* "daterange=20150711-" returns flows active on and after 20150711.
* "daterange=-20150711" returns flows active on and before 20150711.
*/
@GET
@Path("/flows/")
@ -524,12 +629,25 @@ public Set<TimelineEntity> getFlows(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@QueryParam("limit") String limit,
@QueryParam("daterange") String dateRange,
@QueryParam("fields") String fields) {
return getFlows(req, res, null, limit, fields);
return getFlows(req, res, null, limit, dateRange, fields);
}
/**
* Return a list of flows for a given cluster id.
* Return a list of flows for a given cluster id. daterange, if specified is
* given as "[startdate]-[enddate]"(i.e. start and end date separated by -) or
* single date. Dates are interpreted in yyyyMMdd format and are assumed to
* be in GMT. If a single date is specified, all flows active on that date are
* returned. If both startdate and enddate is given, all flows active between
* start and end date will be returned. If only startdate is given, flows
* active on and after startdate are returned. If only enddate is given, flows
* active on and before enddate are returned.
* For example :
* "daterange=20150711" returns flows active on 20150711.
* "daterange=20150711-20150714" returns flows active between these 2 dates.
* "daterange=20150711-" returns flows active on and after 20150711.
* "daterange=-20150711" returns flows active on and before 20150711.
*/
@GET
@Path("/flows/{clusterid}/")
@ -539,6 +657,7 @@ public Set<TimelineEntity> getFlows(
@Context HttpServletResponse res,
@PathParam("clusterid") String clusterId,
@QueryParam("limit") String limit,
@QueryParam("daterange") String dateRange,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
@ -550,11 +669,12 @@ public Set<TimelineEntity> getFlows(
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
Set<TimelineEntity> entities = null;
try {
DateRange range = parseDateRange(dateRange);
entities = timelineReaderManager.getEntities(
null, parseStr(clusterId), null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), parseLongStr(limit),
null, null, null, null, null, null, null, null, null, null,
parseFieldsStr(fields, COMMA_DELIMITER));
range.dateStart, range.dateEnd, null, null, null, null, null, null,
null, null, parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
handleException(e, url, startTime, "limit");
}

View File

@ -87,6 +87,12 @@ protected void augmentParams(Configuration hbaseConf, Connection conn)
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
}
if (createdTimeBegin == null) {
createdTimeBegin = DEFAULT_BEGIN_TIME;
}
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
}
@Override
@ -100,7 +106,16 @@ protected Result getResult(Configuration hbaseConf, Connection conn)
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Scan scan = new Scan();
scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
createdTimeEnd == DEFAULT_END_TIME) {
scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
} else {
scan.setStartRow(
FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd));
scan.setStopRow(
FlowActivityRowKey.getRowKeyPrefix(clusterId,
(createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1))));
}
// use the page filter to limit the result to the page size
// the scanner may still return more than the limit; therefore we need to
// read the right number as we iterate

View File

@ -55,10 +55,31 @@ public String getFlowId() {
return flowId;
}
/**
* Constructs a row key prefix for the flow activity table as follows:
* {@code clusterId!}
*
* @param clusterId
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId) {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
}
/**
* Constructs a row key prefix for the flow activity table as follows:
* {@code clusterId!dayTimestamp!}
*
* @param clusterId
* @param dayTs
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), new byte[0]);
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowId}

View File

@ -27,6 +27,7 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.text.DateFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -48,6 +49,7 @@
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
@ -70,6 +72,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
private TimelineReaderServer server;
private static HBaseTestingUtility util;
private static long ts = System.currentTimeMillis();
private static long dayTs =
TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
@BeforeClass
public static void setup() throws Exception {
@ -509,6 +513,61 @@ public void testGetFlows() throws Exception {
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=" + fmt.format(dayTs) + "-" +
fmt.format(dayTs + (2*86400000L)));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowActivityEntity entity : entities) {
assertTrue((entity.getId().endsWith("@flow_name") &&
entity.getFlowRuns().size() == 2) ||
(entity.getId().endsWith("@flow_name2") &&
entity.getFlowRuns().size() == 1));
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=" +
fmt.format(dayTs + (4*86400000L)));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(0, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=-" +
fmt.format(dayTs + (2*86400000L)));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=" +
fmt.format(dayTs - (2*86400000L)) + "-");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=20150711:20150714");
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=20150714-20150711");
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=2015071129-20150712");
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?daterange=20150711-2015071243");
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
} finally {
client.destroy();
}