YARN-6027. Support fromid(offset) filter for /flows API (Rohith Sharma K S via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-03-02 01:49:34 +05:30
parent cf30b3b914
commit 8bb2646595
10 changed files with 548 additions and 274 deletions

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.List;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.Assert;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
/**
* Test Base for TimelineReaderServer HBase tests.
*/
public abstract class AbstractTimelineReaderHBaseTestBase {
private static int serverPort;
private static TimelineReaderServer server;
private static HBaseTestingUtility util;
public static void setup() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setInt("hfile.format.version", 3);
util.startMiniCluster();
DataGeneratorForTest.createSchema(util.getConfiguration());
}
public static void tearDown() throws Exception {
if (server != null) {
server.stop();
server = null;
}
if (util != null) {
util.shutdownMiniCluster();
}
}
protected static void initialize() throws Exception {
try {
Configuration config = util.getConfiguration();
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
"org.apache.hadoop.yarn.server.timelineservice.storage."
+ "HBaseTimelineReaderImpl");
config.setInt("hfile.format.version", 3);
server = new TimelineReaderServer() {
@Override
protected void setupOptions(Configuration conf) {
// The parent code tries to use HttpServer2 from this version of
// Hadoop, but the tests are loading in HttpServer2 from
// ${hbase-compatible-hadoop.version}. This version uses Jetty 9
// while ${hbase-compatible-hadoop.version} uses Jetty 6, and there
// are many differences, including classnames and packages.
// We do nothing here, so that we don't cause a NoSuchMethodError.
// Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3,
// we should be able to remove this @Override.
}
};
server.init(config);
server.start();
serverPort = server.getWebServerPort();
} catch (Exception e) {
Assert.fail("Web server failed to start");
}
}
protected Client createClient() {
ClientConfig cfg = new DefaultClientConfig();
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
return new Client(
new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg);
}
protected ClientResponse getResponse(Client client, URI uri)
throws Exception {
ClientResponse resp =
client.resource(uri).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
if (resp == null || resp.getStatusInfo()
.getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
String msg = "";
if (resp != null) {
msg = String.valueOf(resp.getStatusInfo().getStatusCode());
}
throw new IOException(
"Incorrect response from timeline reader. " + "Status=" + msg);
}
return resp;
}
protected void verifyHttpResponse(Client client, URI uri, Status status) {
ClientResponse resp =
client.resource(uri).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertNotNull(resp);
assertTrue("Response from server should have been " + status,
resp.getStatusInfo().getStatusCode() == status.getStatusCode());
System.out.println("Response is: " + resp.getEntity(String.class));
}
protected List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri,
int noOfEntities) throws Exception {
ClientResponse resp = getResponse(client, uri);
List<FlowActivityEntity> entities =
resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
});
assertNotNull(entities);
assertEquals(noOfEntities, entities.size());
return entities;
}
protected static class DummyURLConnectionFactory
implements HttpURLConnectionFactory {
@Override
public HttpURLConnection getHttpURLConnection(final URL url)
throws IOException {
try {
return (HttpURLConnection) url.openConnection();
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
}
}
}
protected static HBaseTestingUtility getHBaseTestingUtility() {
return util;
}
public static int getServerPort() {
return serverPort;
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
/**
* Interface which has to be implemented for encoding and decoding row keys or
* column qualifiers as string.
*/
public interface KeyConverterToString<T> {
/**
* Encode key as string.
* @param key of type T to be encoded as string.
* @return encoded value as string.
*/
String encodeAsString(T key);
/**
* Decode row key from string to a key of type T.
* @param encodedKey string representation of row key
* @return type T which has been constructed after decoding string.
*/
T decodeFromString(String encodedKey);
}

View File

@ -17,10 +17,14 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
@ -32,8 +36,8 @@ public class FlowActivityRowKey {
private final Long dayTs;
private final String userId;
private final String flowName;
private final KeyConverter<FlowActivityRowKey> flowActivityRowKeyConverter =
new FlowActivityRowKeyConverter();
private final FlowActivityRowKeyConverter
flowActivityRowKeyConverter = new FlowActivityRowKeyConverter();
/**
* @param clusterId identifying the cluster
@ -103,14 +107,33 @@ public class FlowActivityRowKey {
return new FlowActivityRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}.
* @return String representation of row key
*/
public String getRowKeyAsString() {
return flowActivityRowKeyConverter.encodeAsString(this);
}
/**
* Given the raw row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>FlowActivityRowKey</cite> object.
*/
public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) {
return new FlowActivityRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for flow activity table. The row key is of the
* form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
* timestamp) is a long and rest are strings.
* <p>
*/
final private static class FlowActivityRowKeyConverter implements
KeyConverter<FlowActivityRowKey> {
final private static class FlowActivityRowKeyConverter
implements KeyConverter<FlowActivityRowKey>,
KeyConverterToString<FlowActivityRowKey> {
private FlowActivityRowKeyConverter() {
}
@ -192,5 +215,33 @@ public class FlowActivityRowKey {
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
}
@Override
public String encodeAsString(FlowActivityRowKey key) {
if (key.getDayTimestamp() == null) {
return TimelineReaderUtils
.joinAndEscapeStrings(new String[] {key.clusterId});
} else if (key.getUserId() == null) {
return TimelineReaderUtils.joinAndEscapeStrings(
new String[] {key.clusterId, key.dayTs.toString()});
} else if (key.getFlowName() == null) {
return TimelineReaderUtils.joinAndEscapeStrings(
new String[] {key.clusterId, key.dayTs.toString(), key.userId});
}
return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
key.clusterId, key.dayTs.toString(), key.userId, key.flowName});
}
@Override
public FlowActivityRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 4) {
throw new IllegalArgumentException(
"Invalid row key for flow activity.");
}
Long dayTs = Long.valueOf(split.get(1));
return new FlowActivityRowKey(split.get(0), dayTs, split.get(2),
split.get(3));
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityCo
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.common.base.Preconditions;
@ -110,11 +112,30 @@ class FlowActivityEntityReader extends TimelineEntityReader {
Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
String clusterId = getContext().getClusterId();
if (getFilters().getCreatedTimeBegin() == 0L &&
getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
if (getFilters().getFromId() == null
&& getFilters().getCreatedTimeBegin() == 0L
&& getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
// All records have to be chosen.
scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
.getRowKeyPrefix());
} else if (getFilters().getFromId() != null) {
FlowActivityRowKey key = null;
try {
key =
FlowActivityRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!clusterId.equals(key.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + clusterId);
}
scan.setStartRow(key.getRowKey());
scan.setStopRow(
new FlowActivityRowKeyPrefix(clusterId,
(getFilters().getCreatedTimeBegin() <= 0 ? 0
: (getFilters().getCreatedTimeBegin() - 1)))
.getRowKeyPrefix());
} else {
scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
.getCreatedTimeEnd()).getRowKeyPrefix());
@ -157,7 +178,8 @@ class FlowActivityEntityReader extends TimelineEntityReader {
flowRun.setId(flowRun.getId());
flowActivity.addFlowRun(flowRun);
}
flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
rowKey.getRowKeyAsString());
return flowActivity;
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
@ -224,6 +225,26 @@ public class TestRowKeys {
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testFlowActivityRowKeyAsString() {
String cluster = "cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
String user = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
String fName = "dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
+ TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
Long ts = 1459900830000L;
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
String rowKeyAsString =
new FlowActivityRowKey(cluster, ts, user, fName).getRowKeyAsString();
FlowActivityRowKey rowKey =
FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(cluster, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(user, rowKey.getUserId());
assertEquals(fName, rowKey.getFlowName());
}
@Test
public void testFlowRunRowKey() {
byte[] byteRowKey =

View File

@ -185,7 +185,7 @@ public class TimelineReaderServer extends CompositeService {
}
@VisibleForTesting
int getWebServerPort() {
public int getWebServerPort() {
return readerWebServer.getConnectorAddress(0).getPort();
}

View File

@ -24,13 +24,29 @@ import java.util.List;
import org.apache.commons.lang.StringUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* Set of utility methods to be used across timeline reader.
*/
final class TimelineReaderUtils {
public final class TimelineReaderUtils {
private TimelineReaderUtils() {
}
/**
* Default delimiter for joining strings.
*/
@VisibleForTesting
public static final char DEFAULT_DELIMITER_CHAR = '!';
/**
* Default escape character used for joining strings.
*/
@VisibleForTesting
public static final char DEFAULT_ESCAPE_CHAR = '*';
public static final String FROMID_KEY = "FROM_ID";
/**
* Split the passed string along the passed delimiter character while looking
* for escape char to interpret the splitted parts correctly. For delimiter or
@ -168,4 +184,14 @@ final class TimelineReaderUtils {
// Join the strings after they have been escaped.
return StringUtils.join(strs, delimiterChar);
}
public static List<String> split(final String str)
throws IllegalArgumentException {
return split(str, DEFAULT_DELIMITER_CHAR, DEFAULT_ESCAPE_CHAR);
}
public static String joinAndEscapeStrings(final String[] strs) {
return joinAndEscapeStrings(strs, DEFAULT_DELIMITER_CHAR,
DEFAULT_ESCAPE_CHAR);
}
}

View File

@ -1334,6 +1334,10 @@ public class TimelineReaderWebServices {
* 2 dates.
* "daterange=20150711-" returns flows active on and after 20150711.
* "daterange=-20150711" returns flows active on and before 20150711.
* @param fromId If specified, retrieve the next set of flows from the given
* fromId. The set of flows retrieved is inclusive of specified fromId.
* fromId should be taken from the value associated with FROM_ID info key
* in flow entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowActivityEntity</cite> instances are returned.<br>
@ -1350,8 +1354,9 @@ public class TimelineReaderWebServices {
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@QueryParam("limit") String limit,
@QueryParam("daterange") String dateRange) {
return getFlows(req, res, null, limit, dateRange);
@QueryParam("daterange") String dateRange,
@QueryParam("fromid") String fromId) {
return getFlows(req, res, null, limit, dateRange, fromId);
}
/**
@ -1380,6 +1385,10 @@ public class TimelineReaderWebServices {
* 2 dates.
* "daterange=20150711-" returns flows active on and after 20150711.
* "daterange=-20150711" returns flows active on and before 20150711.
* @param fromId If specified, retrieve the next set of flows from the given
* fromId. The set of flows retrieved is inclusive of specified fromId.
* fromId should be taken from the value associated with FROM_ID info key
* in flow entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowActivityEntity</cite> instances are returned.<br>
@ -1397,7 +1406,8 @@ public class TimelineReaderWebServices {
@Context HttpServletResponse res,
@PathParam("clusterid") String clusterId,
@QueryParam("limit") String limit,
@QueryParam("daterange") String dateRange) {
@QueryParam("daterange") String dateRange,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@ -1414,7 +1424,7 @@ public class TimelineReaderWebServices {
TimelineEntityFilters entityFilters =
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, null, null, null, null, null, null, null, null, null,
null);
fromId);
entityFilters.setCreatedTimeBegin(range.dateStart);
entityFilters.setCreatedTimeEnd(range.dateEnd);
entities = timelineReaderManager.getEntities(

View File

@ -195,39 +195,29 @@ enum TimelineUIDConverter {
};
/**
* Delimiter used for UID.
*/
public static final char UID_DELIMITER_CHAR = '!';
/**
* Escape Character used if delimiter or escape character itself is part of
* different components of UID.
*/
public static final char UID_ESCAPE_CHAR = '*';
/**
* Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}.
* Split UID using {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} and
* {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR}.
* @param uid UID to be splitted.
* @return a list of different parts of UID split across delimiter.
* @throws IllegalArgumentException if UID is not properly escaped.
*/
private static List<String> splitUID(String uid)
throws IllegalArgumentException {
return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR);
return TimelineReaderUtils.split(uid);
}
/**
* Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with
* delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if
* UID parts contain them.
* Join different parts of UID delimited by
* {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} with delimiter and
* escape character escaped using
* {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR} if UID parts contain them.
* @param parts an array of UID parts to be joined.
* @return a string joined using the delimiter with escape and delimiter
* characters escaped if they are part of the string parts to be joined.
* Returns null if one of the parts is null.
* characters escaped if they are part of the string parts to be
* joined. Returns null if one of the parts is null.
*/
private static String joinAndEscapeUIDParts(String[] parts) {
return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR,
UID_ESCAPE_CHAR);
return TimelineReaderUtils.joinAndEscapeStrings(parts);
}
/**