YARN-6027. Support fromid(offset) filter for /flows API (Rohith Sharma K S via Varun Saxena)
(cherry picked from commit 63c06ec44e633567c378e28898e319143593ff30) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
This commit is contained in:
parent
6b5b8f090d
commit
d996afd585
|
@ -0,0 +1,176 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.reader;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
|
||||||
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.client.Client;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||||
|
import com.sun.jersey.api.client.GenericType;
|
||||||
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
|
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||||
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||||
|
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Base for TimelineReaderServer HBase tests.
|
||||||
|
*/
|
||||||
|
public abstract class AbstractTimelineReaderHBaseTestBase {
|
||||||
|
private static int serverPort;
|
||||||
|
private static TimelineReaderServer server;
|
||||||
|
private static HBaseTestingUtility util;
|
||||||
|
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
util = new HBaseTestingUtility();
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
conf.setInt("hfile.format.version", 3);
|
||||||
|
util.startMiniCluster();
|
||||||
|
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
if (server != null) {
|
||||||
|
server.stop();
|
||||||
|
server = null;
|
||||||
|
}
|
||||||
|
if (util != null) {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void initialize() throws Exception {
|
||||||
|
try {
|
||||||
|
Configuration config = util.getConfiguration();
|
||||||
|
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||||
|
"localhost:0");
|
||||||
|
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
||||||
|
config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||||
|
"org.apache.hadoop.yarn.server.timelineservice.storage."
|
||||||
|
+ "HBaseTimelineReaderImpl");
|
||||||
|
config.setInt("hfile.format.version", 3);
|
||||||
|
server = new TimelineReaderServer() {
|
||||||
|
@Override
|
||||||
|
protected void setupOptions(Configuration conf) {
|
||||||
|
// The parent code tries to use HttpServer2 from this version of
|
||||||
|
// Hadoop, but the tests are loading in HttpServer2 from
|
||||||
|
// ${hbase-compatible-hadoop.version}. This version uses Jetty 9
|
||||||
|
// while ${hbase-compatible-hadoop.version} uses Jetty 6, and there
|
||||||
|
// are many differences, including classnames and packages.
|
||||||
|
// We do nothing here, so that we don't cause a NoSuchMethodError.
|
||||||
|
// Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3,
|
||||||
|
// we should be able to remove this @Override.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
server.init(config);
|
||||||
|
server.start();
|
||||||
|
serverPort = server.getWebServerPort();
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Web server failed to start");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Client createClient() {
|
||||||
|
ClientConfig cfg = new DefaultClientConfig();
|
||||||
|
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||||
|
return new Client(
|
||||||
|
new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ClientResponse getResponse(Client client, URI uri)
|
||||||
|
throws Exception {
|
||||||
|
ClientResponse resp =
|
||||||
|
client.resource(uri).accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
|
if (resp == null || resp.getStatusInfo()
|
||||||
|
.getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
|
||||||
|
String msg = "";
|
||||||
|
if (resp != null) {
|
||||||
|
msg = String.valueOf(resp.getStatusInfo().getStatusCode());
|
||||||
|
}
|
||||||
|
throw new IOException(
|
||||||
|
"Incorrect response from timeline reader. " + "Status=" + msg);
|
||||||
|
}
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void verifyHttpResponse(Client client, URI uri, Status status) {
|
||||||
|
ClientResponse resp =
|
||||||
|
client.resource(uri).accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
|
assertNotNull(resp);
|
||||||
|
assertTrue("Response from server should have been " + status,
|
||||||
|
resp.getStatusInfo().getStatusCode() == status.getStatusCode());
|
||||||
|
System.out.println("Response is: " + resp.getEntity(String.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri,
|
||||||
|
int noOfEntities) throws Exception {
|
||||||
|
ClientResponse resp = getResponse(client, uri);
|
||||||
|
List<FlowActivityEntity> entities =
|
||||||
|
resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
|
||||||
|
});
|
||||||
|
assertNotNull(entities);
|
||||||
|
assertEquals(noOfEntities, entities.size());
|
||||||
|
return entities;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class DummyURLConnectionFactory
|
||||||
|
implements HttpURLConnectionFactory {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpURLConnection getHttpURLConnection(final URL url)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
return (HttpURLConnection) url.openConnection();
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
throw new IOException(e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static HBaseTestingUtility getHBaseTestingUtility() {
|
||||||
|
return util;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int getServerPort() {
|
||||||
|
return serverPort;
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface which has to be implemented for encoding and decoding row keys or
|
||||||
|
* column qualifiers as string.
|
||||||
|
*/
|
||||||
|
public interface KeyConverterToString<T> {
|
||||||
|
/**
|
||||||
|
* Encode key as string.
|
||||||
|
* @param key of type T to be encoded as string.
|
||||||
|
* @return encoded value as string.
|
||||||
|
*/
|
||||||
|
String encodeAsString(T key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decode row key from string to a key of type T.
|
||||||
|
* @param encodedKey string representation of row key
|
||||||
|
* @return type T which has been constructed after decoding string.
|
||||||
|
*/
|
||||||
|
T decodeFromString(String encodedKey);
|
||||||
|
}
|
|
@ -17,10 +17,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,8 +36,8 @@ public class FlowActivityRowKey {
|
||||||
private final Long dayTs;
|
private final Long dayTs;
|
||||||
private final String userId;
|
private final String userId;
|
||||||
private final String flowName;
|
private final String flowName;
|
||||||
private final KeyConverter<FlowActivityRowKey> flowActivityRowKeyConverter =
|
private final FlowActivityRowKeyConverter
|
||||||
new FlowActivityRowKeyConverter();
|
flowActivityRowKeyConverter = new FlowActivityRowKeyConverter();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param clusterId identifying the cluster
|
* @param clusterId identifying the cluster
|
||||||
|
@ -103,14 +107,33 @@ public class FlowActivityRowKey {
|
||||||
return new FlowActivityRowKeyConverter().decode(rowKey);
|
return new FlowActivityRowKeyConverter().decode(rowKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a row key for the flow activity table as follows:
|
||||||
|
* {@code clusterId!dayTimestamp!user!flowName}.
|
||||||
|
* @return String representation of row key
|
||||||
|
*/
|
||||||
|
public String getRowKeyAsString() {
|
||||||
|
return flowActivityRowKeyConverter.encodeAsString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given the raw row key as string, returns the row key as an object.
|
||||||
|
* @param encodedRowKey String representation of row key.
|
||||||
|
* @return A <cite>FlowActivityRowKey</cite> object.
|
||||||
|
*/
|
||||||
|
public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) {
|
||||||
|
return new FlowActivityRowKeyConverter().decodeFromString(encodedRowKey);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encodes and decodes row key for flow activity table. The row key is of the
|
* Encodes and decodes row key for flow activity table. The row key is of the
|
||||||
* form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
|
* form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
|
||||||
* timestamp) is a long and rest are strings.
|
* timestamp) is a long and rest are strings.
|
||||||
* <p>
|
* <p>
|
||||||
*/
|
*/
|
||||||
final private static class FlowActivityRowKeyConverter implements
|
final private static class FlowActivityRowKeyConverter
|
||||||
KeyConverter<FlowActivityRowKey> {
|
implements KeyConverter<FlowActivityRowKey>,
|
||||||
|
KeyConverterToString<FlowActivityRowKey> {
|
||||||
|
|
||||||
private FlowActivityRowKeyConverter() {
|
private FlowActivityRowKeyConverter() {
|
||||||
}
|
}
|
||||||
|
@ -192,5 +215,33 @@ public class FlowActivityRowKey {
|
||||||
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
|
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
|
||||||
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
|
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String encodeAsString(FlowActivityRowKey key) {
|
||||||
|
if (key.getDayTimestamp() == null) {
|
||||||
|
return TimelineReaderUtils
|
||||||
|
.joinAndEscapeStrings(new String[] {key.clusterId});
|
||||||
|
} else if (key.getUserId() == null) {
|
||||||
|
return TimelineReaderUtils.joinAndEscapeStrings(
|
||||||
|
new String[] {key.clusterId, key.dayTs.toString()});
|
||||||
|
} else if (key.getFlowName() == null) {
|
||||||
|
return TimelineReaderUtils.joinAndEscapeStrings(
|
||||||
|
new String[] {key.clusterId, key.dayTs.toString(), key.userId});
|
||||||
|
}
|
||||||
|
return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
|
||||||
|
key.clusterId, key.dayTs.toString(), key.userId, key.flowName});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FlowActivityRowKey decodeFromString(String encodedRowKey) {
|
||||||
|
List<String> split = TimelineReaderUtils.split(encodedRowKey);
|
||||||
|
if (split == null || split.size() != 4) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid row key for flow activity.");
|
||||||
|
}
|
||||||
|
Long dayTs = Long.valueOf(split.get(1));
|
||||||
|
return new FlowActivityRowKey(split.get(0), dayTs, split.get(2),
|
||||||
|
split.get(3));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityCo
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
|
||||||
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@ -110,11 +112,30 @@ class FlowActivityEntityReader extends TimelineEntityReader {
|
||||||
Connection conn, FilterList filterList) throws IOException {
|
Connection conn, FilterList filterList) throws IOException {
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
String clusterId = getContext().getClusterId();
|
String clusterId = getContext().getClusterId();
|
||||||
if (getFilters().getCreatedTimeBegin() == 0L &&
|
if (getFilters().getFromId() == null
|
||||||
getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
|
&& getFilters().getCreatedTimeBegin() == 0L
|
||||||
|
&& getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
|
||||||
// All records have to be chosen.
|
// All records have to be chosen.
|
||||||
scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
|
scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
|
||||||
.getRowKeyPrefix());
|
.getRowKeyPrefix());
|
||||||
|
} else if (getFilters().getFromId() != null) {
|
||||||
|
FlowActivityRowKey key = null;
|
||||||
|
try {
|
||||||
|
key =
|
||||||
|
FlowActivityRowKey.parseRowKeyFromString(getFilters().getFromId());
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new BadRequestException("Invalid filter fromid is provided.");
|
||||||
|
}
|
||||||
|
if (!clusterId.equals(key.getClusterId())) {
|
||||||
|
throw new BadRequestException(
|
||||||
|
"fromid doesn't belong to clusterId=" + clusterId);
|
||||||
|
}
|
||||||
|
scan.setStartRow(key.getRowKey());
|
||||||
|
scan.setStopRow(
|
||||||
|
new FlowActivityRowKeyPrefix(clusterId,
|
||||||
|
(getFilters().getCreatedTimeBegin() <= 0 ? 0
|
||||||
|
: (getFilters().getCreatedTimeBegin() - 1)))
|
||||||
|
.getRowKeyPrefix());
|
||||||
} else {
|
} else {
|
||||||
scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
|
scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
|
||||||
.getCreatedTimeEnd()).getRowKeyPrefix());
|
.getCreatedTimeEnd()).getRowKeyPrefix());
|
||||||
|
@ -157,7 +178,8 @@ class FlowActivityEntityReader extends TimelineEntityReader {
|
||||||
flowRun.setId(flowRun.getId());
|
flowRun.setId(flowRun.getId());
|
||||||
flowActivity.addFlowRun(flowRun);
|
flowActivity.addFlowRun(flowRun);
|
||||||
}
|
}
|
||||||
|
flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
|
||||||
|
rowKey.getRowKeyAsString());
|
||||||
return flowActivity;
|
return flowActivity;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
|
@ -224,6 +225,26 @@ public class TestRowKeys {
|
||||||
verifyRowPrefixBytes(byteRowKeyPrefix);
|
verifyRowPrefixBytes(byteRowKeyPrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlowActivityRowKeyAsString() {
|
||||||
|
String cluster = "cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
|
||||||
|
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
|
||||||
|
String user = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
|
||||||
|
String fName = "dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
|
||||||
|
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
|
||||||
|
+ TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
|
||||||
|
Long ts = 1459900830000L;
|
||||||
|
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
|
||||||
|
String rowKeyAsString =
|
||||||
|
new FlowActivityRowKey(cluster, ts, user, fName).getRowKeyAsString();
|
||||||
|
FlowActivityRowKey rowKey =
|
||||||
|
FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
|
||||||
|
assertEquals(cluster, rowKey.getClusterId());
|
||||||
|
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
|
||||||
|
assertEquals(user, rowKey.getUserId());
|
||||||
|
assertEquals(fName, rowKey.getFlowName());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFlowRunRowKey() {
|
public void testFlowRunRowKey() {
|
||||||
byte[] byteRowKey =
|
byte[] byteRowKey =
|
||||||
|
|
|
@ -175,7 +175,7 @@ public class TimelineReaderServer extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
int getWebServerPort() {
|
public int getWebServerPort() {
|
||||||
return readerWebServer.getConnectorAddress(0).getPort();
|
return readerWebServer.getConnectorAddress(0).getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,13 +24,29 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set of utility methods to be used across timeline reader.
|
* Set of utility methods to be used across timeline reader.
|
||||||
*/
|
*/
|
||||||
final class TimelineReaderUtils {
|
public final class TimelineReaderUtils {
|
||||||
private TimelineReaderUtils() {
|
private TimelineReaderUtils() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default delimiter for joining strings.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static final char DEFAULT_DELIMITER_CHAR = '!';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default escape character used for joining strings.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static final char DEFAULT_ESCAPE_CHAR = '*';
|
||||||
|
|
||||||
|
public static final String FROMID_KEY = "FROM_ID";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split the passed string along the passed delimiter character while looking
|
* Split the passed string along the passed delimiter character while looking
|
||||||
* for escape char to interpret the splitted parts correctly. For delimiter or
|
* for escape char to interpret the splitted parts correctly. For delimiter or
|
||||||
|
@ -168,4 +184,14 @@ final class TimelineReaderUtils {
|
||||||
// Join the strings after they have been escaped.
|
// Join the strings after they have been escaped.
|
||||||
return StringUtils.join(strs, delimiterChar);
|
return StringUtils.join(strs, delimiterChar);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<String> split(final String str)
|
||||||
|
throws IllegalArgumentException {
|
||||||
|
return split(str, DEFAULT_DELIMITER_CHAR, DEFAULT_ESCAPE_CHAR);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String joinAndEscapeStrings(final String[] strs) {
|
||||||
|
return joinAndEscapeStrings(strs, DEFAULT_DELIMITER_CHAR,
|
||||||
|
DEFAULT_ESCAPE_CHAR);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1333,6 +1333,10 @@ public class TimelineReaderWebServices {
|
||||||
* 2 dates.
|
* 2 dates.
|
||||||
* "daterange=20150711-" returns flows active on and after 20150711.
|
* "daterange=20150711-" returns flows active on and after 20150711.
|
||||||
* "daterange=-20150711" returns flows active on and before 20150711.
|
* "daterange=-20150711" returns flows active on and before 20150711.
|
||||||
|
* @param fromId If specified, retrieve the next set of flows from the given
|
||||||
|
* fromId. The set of flows retrieved is inclusive of specified fromId.
|
||||||
|
* fromId should be taken from the value associated with FROM_ID info key
|
||||||
|
* in flow entity response which was sent earlier.
|
||||||
*
|
*
|
||||||
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
||||||
* set of <cite>FlowActivityEntity</cite> instances are returned.<br>
|
* set of <cite>FlowActivityEntity</cite> instances are returned.<br>
|
||||||
|
@ -1349,8 +1353,9 @@ public class TimelineReaderWebServices {
|
||||||
@Context HttpServletRequest req,
|
@Context HttpServletRequest req,
|
||||||
@Context HttpServletResponse res,
|
@Context HttpServletResponse res,
|
||||||
@QueryParam("limit") String limit,
|
@QueryParam("limit") String limit,
|
||||||
@QueryParam("daterange") String dateRange) {
|
@QueryParam("daterange") String dateRange,
|
||||||
return getFlows(req, res, null, limit, dateRange);
|
@QueryParam("fromid") String fromId) {
|
||||||
|
return getFlows(req, res, null, limit, dateRange, fromId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1379,6 +1384,10 @@ public class TimelineReaderWebServices {
|
||||||
* 2 dates.
|
* 2 dates.
|
||||||
* "daterange=20150711-" returns flows active on and after 20150711.
|
* "daterange=20150711-" returns flows active on and after 20150711.
|
||||||
* "daterange=-20150711" returns flows active on and before 20150711.
|
* "daterange=-20150711" returns flows active on and before 20150711.
|
||||||
|
* @param fromId If specified, retrieve the next set of flows from the given
|
||||||
|
* fromId. The set of flows retrieved is inclusive of specified fromId.
|
||||||
|
* fromId should be taken from the value associated with FROM_ID info key
|
||||||
|
* in flow entity response which was sent earlier.
|
||||||
*
|
*
|
||||||
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
* @return If successful, a HTTP 200(OK) response having a JSON representing a
|
||||||
* set of <cite>FlowActivityEntity</cite> instances are returned.<br>
|
* set of <cite>FlowActivityEntity</cite> instances are returned.<br>
|
||||||
|
@ -1396,7 +1405,8 @@ public class TimelineReaderWebServices {
|
||||||
@Context HttpServletResponse res,
|
@Context HttpServletResponse res,
|
||||||
@PathParam("clusterid") String clusterId,
|
@PathParam("clusterid") String clusterId,
|
||||||
@QueryParam("limit") String limit,
|
@QueryParam("limit") String limit,
|
||||||
@QueryParam("daterange") String dateRange) {
|
@QueryParam("daterange") String dateRange,
|
||||||
|
@QueryParam("fromid") String fromId) {
|
||||||
String url = req.getRequestURI() +
|
String url = req.getRequestURI() +
|
||||||
(req.getQueryString() == null ? "" :
|
(req.getQueryString() == null ? "" :
|
||||||
QUERY_STRING_SEP + req.getQueryString());
|
QUERY_STRING_SEP + req.getQueryString());
|
||||||
|
@ -1413,7 +1423,7 @@ public class TimelineReaderWebServices {
|
||||||
TimelineEntityFilters entityFilters =
|
TimelineEntityFilters entityFilters =
|
||||||
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
|
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
|
||||||
limit, null, null, null, null, null, null, null, null, null,
|
limit, null, null, null, null, null, null, null, null, null,
|
||||||
null);
|
fromId);
|
||||||
entityFilters.setCreatedTimeBegin(range.dateStart);
|
entityFilters.setCreatedTimeBegin(range.dateStart);
|
||||||
entityFilters.setCreatedTimeEnd(range.dateEnd);
|
entityFilters.setCreatedTimeEnd(range.dateEnd);
|
||||||
entities = timelineReaderManager.getEntities(
|
entities = timelineReaderManager.getEntities(
|
||||||
|
|
|
@ -195,39 +195,29 @@ enum TimelineUIDConverter {
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delimiter used for UID.
|
* Split UID using {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} and
|
||||||
*/
|
* {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR}.
|
||||||
public static final char UID_DELIMITER_CHAR = '!';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Escape Character used if delimiter or escape character itself is part of
|
|
||||||
* different components of UID.
|
|
||||||
*/
|
|
||||||
public static final char UID_ESCAPE_CHAR = '*';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}.
|
|
||||||
* @param uid UID to be splitted.
|
* @param uid UID to be splitted.
|
||||||
* @return a list of different parts of UID split across delimiter.
|
* @return a list of different parts of UID split across delimiter.
|
||||||
* @throws IllegalArgumentException if UID is not properly escaped.
|
* @throws IllegalArgumentException if UID is not properly escaped.
|
||||||
*/
|
*/
|
||||||
private static List<String> splitUID(String uid)
|
private static List<String> splitUID(String uid)
|
||||||
throws IllegalArgumentException {
|
throws IllegalArgumentException {
|
||||||
return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR);
|
return TimelineReaderUtils.split(uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with
|
* Join different parts of UID delimited by
|
||||||
* delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if
|
* {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} with delimiter and
|
||||||
* UID parts contain them.
|
* escape character escaped using
|
||||||
|
* {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR} if UID parts contain them.
|
||||||
* @param parts an array of UID parts to be joined.
|
* @param parts an array of UID parts to be joined.
|
||||||
* @return a string joined using the delimiter with escape and delimiter
|
* @return a string joined using the delimiter with escape and delimiter
|
||||||
* characters escaped if they are part of the string parts to be joined.
|
* characters escaped if they are part of the string parts to be
|
||||||
* Returns null if one of the parts is null.
|
* joined. Returns null if one of the parts is null.
|
||||||
*/
|
*/
|
||||||
private static String joinAndEscapeUIDParts(String[] parts) {
|
private static String joinAndEscapeUIDParts(String[] parts) {
|
||||||
return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR,
|
return TimelineReaderUtils.joinAndEscapeStrings(parts);
|
||||||
UID_ESCAPE_CHAR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue