diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java new file mode 100644 index 00000000000..7853c948aeb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.List; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.junit.Assert; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.GenericType; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +/** + * Test Base for TimelineReaderServer HBase tests. + */ +public abstract class AbstractTimelineReaderHBaseTestBase { + private static int serverPort; + private static TimelineReaderServer server; + private static HBaseTestingUtility util; + + public static void setup() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + } + + public static void tearDown() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + if (util != null) { + util.shutdownMiniCluster(); + } + } + + protected static void initialize() throws Exception { + try { + Configuration config = util.getConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + "localhost:0"); + config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "HBaseTimelineReaderImpl"); + config.setInt("hfile.format.version", 3); + server = new TimelineReaderServer() { + @Override + protected void setupOptions(Configuration conf) { + // The parent code tries to use HttpServer2 from this version of + // Hadoop, but the tests are loading in HttpServer2 from + // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 + // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there + // are many differences, including classnames and packages. + // We do nothing here, so that we don't cause a NoSuchMethodError. + // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, + // we should be able to remove this @Override. + } + }; + server.init(config); + server.start(); + serverPort = server.getWebServerPort(); + } catch (Exception e) { + Assert.fail("Web server failed to start"); + } + } + + protected Client createClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + return new Client( + new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg); + } + + protected ClientResponse getResponse(Client client, URI uri) + throws Exception { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + if (resp == null || resp.getStatusInfo() + .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) { + String msg = ""; + if (resp != null) { + msg = String.valueOf(resp.getStatusInfo().getStatusCode()); + } + throw new IOException( + "Incorrect response from timeline reader. " + "Status=" + msg); + } + return resp; + } + + protected void verifyHttpResponse(Client client, URI uri, Status status) { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertNotNull(resp); + assertTrue("Response from server should have been " + status, + resp.getStatusInfo().getStatusCode() == status.getStatusCode()); + System.out.println("Response is: " + resp.getEntity(String.class)); + } + + protected List verifyFlowEntites(Client client, URI uri, + int noOfEntities) throws Exception { + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(noOfEntities, entities.size()); + return entities; + } + + protected static class DummyURLConnectionFactory + implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + try { + return (HttpURLConnection) url.openConnection(); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } + } + } + + protected static HBaseTestingUtility getHBaseTestingUtility() { + return util; + } + + public static int getServerPort() { + return serverPort; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index d9a7078c290..6386183b530 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -24,10 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.HttpURLConnection; import java.net.URI; -import java.net.URL; import java.text.DateFormat; import java.util.ArrayList; import java.util.HashMap; @@ -39,7 +36,6 @@ import java.util.Set; import javax.ws.rs.core.MediaType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; @@ -50,17 +46,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -70,27 +60,26 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; -public class TestTimelineReaderWebServicesHBaseStorage { - private int serverPort; - private TimelineReaderServer server; - private static HBaseTestingUtility util; +/** + * Test TimelineReder Web Service REST API's. + */ +public class TestTimelineReaderWebServicesHBaseStorage + extends AbstractTimelineReaderHBaseTestBase { private static long ts = System.currentTimeMillis(); private static long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); @BeforeClass - public static void setup() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - DataGeneratorForTest.createSchema(conf); + public static void setupBeforeClass() throws Exception { + setup(); loadData(); + initialize(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + tearDown(); } private static void loadData() throws Exception { @@ -344,7 +333,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } HBaseTimelineWriterImpl hbi = null; - Configuration c1 = util.getConfiguration(); + Configuration c1 = getHBaseTestingUtility().getConfiguration(); try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); @@ -393,71 +382,6 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } - @AfterClass - public static void tearDown() throws Exception { - util.shutdownMiniCluster(); - } - - @Before - public void init() throws Exception { - try { - Configuration config = util.getConfiguration(); - config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); - config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - "localhost:0"); - config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); - config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, - "org.apache.hadoop.yarn.server.timelineservice.storage." + - "HBaseTimelineReaderImpl"); - config.setInt("hfile.format.version", 3); - server = new TimelineReaderServer(); - server.init(config); - server.start(); - serverPort = server.getWebServerPort(); - } catch (Exception e) { - Assert.fail("Web server failed to start"); - } - } - - private static Client createClient() { - ClientConfig cfg = new DefaultClientConfig(); - cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); - return new Client(new URLConnectionClientHandler( - new DummyURLConnectionFactory()), cfg); - } - - private static ClientResponse getResponse(Client client, URI uri) - throws Exception { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - if (resp == null || - resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = ""; - if (resp != null) { - msg = String.valueOf(resp.getClientResponseStatus()); - } - throw new IOException("Incorrect response from timeline reader. " + - "Status=" + msg); - } - return resp; - } - - private static class DummyURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) - throws IOException { - try { - return (HttpURLConnection)url.openConnection(); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } - } - } - private static TimelineEntity newEntity(String type, String id) { TimelineEntity entity = new TimelineEntity(); entity.setIdentifier(new TimelineEntity.Identifier(type, id)); @@ -499,22 +423,11 @@ public class TestTimelineReaderWebServicesHBaseStorage { return false; } - private static void verifyHttpResponse(Client client, URI uri, - Status status) { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertNotNull(resp); - assertTrue("Response from server should have been " + status, - resp.getClientResponseStatus() == status); - System.out.println("Response is: " + resp.getEntity(String.class)); - } - @Test public void testGetFlowRun() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919"); ClientResponse resp = getResponse(client, uri); @@ -534,7 +447,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } // Query without specifying cluster ID. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/runs/1002345678919"); resp = getResponse(client, uri); entity = resp.getEntity(FlowRunEntity.class); @@ -559,7 +472,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowRuns() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs"); ClientResponse resp = getResponse(client, uri); Set entities = @@ -578,8 +491,9 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entity.getMetrics().size()); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + - "clusters/cluster1/users/user1/flows/flow_name/runs?limit=1"); + uri = + URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + + "clusters/cluster1/users/user1/flows/flow_name/runs?limit=1"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); @@ -593,7 +507,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entity.getMetrics().size()); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "createdtimestart=1425016501030"); resp = getResponse(client, uri); @@ -609,7 +523,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entity.getMetrics().size()); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "createdtimestart=1425016500999&createdtimeend=1425016501035"); resp = getResponse(client, uri); @@ -628,7 +542,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entity.getMetrics().size()); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "createdtimeend=1425016501030"); resp = getResponse(client, uri); @@ -644,7 +558,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entity.getMetrics().size()); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "fields=metrics"); resp = getResponse(client, uri); @@ -666,7 +580,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // fields as CONFIGS will lead to a HTTP 400 as it makes no sense for // flow runs. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "fields=CONFIGS"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); @@ -679,7 +593,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowRunsMetricsToRetrieve() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "metricstoretrieve=MAP_,HDFS_"); ClientResponse resp = getResponse(client, uri); @@ -698,7 +612,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(3, metricCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "metricstoretrieve=!(MAP_,HDFS_)"); resp = getResponse(client, uri); @@ -724,7 +638,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { // Query all flows. - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/flows"); ClientResponse resp = getResponse(client, uri); Set flowEntities = @@ -750,7 +664,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query flowruns based on UID returned in query above. List listFlowRunUIDs = new ArrayList(); for (String flowUID : listFlowUIDs) { - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/flow-uid/" + flowUID + "/runs"); resp = getResponse(client, uri); Set frEntities = @@ -770,7 +684,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query single flowrun based on UIDs' returned in query to get flowruns. for (String flowRunUID : listFlowRunUIDs) { - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/" + flowRunUID); resp = getResponse(client, uri); FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); @@ -782,7 +696,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { for (String flowRunUID : listFlowRunUIDs) { TimelineReaderContext context = TimelineUIDConverter.FLOWRUN_UID.decodeUID(flowRunUID); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/" + flowRunUID + "/apps"); resp = getResponse(client, uri); Set appEntities = @@ -802,7 +716,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query single app based on UIDs' returned in query to get apps. for (String appUID : listAppUIDs) { - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/" + appUID); resp = getResponse(client, uri); TimelineEntity entity = resp.getEntity(TimelineEntity.class); @@ -815,7 +729,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { for (String appUID : listAppUIDs) { TimelineReaderContext context = TimelineUIDConverter.APPLICATION_UID.decodeUID(appUID); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/" + appUID + "/entities/type1"); resp = getResponse(client, uri); Set entities = @@ -837,39 +751,39 @@ public class TestTimelineReaderWebServicesHBaseStorage { // Query single entity based on UIDs' returned in query to get entities. for (String entityUID : listEntityUIDs) { - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/entity-uid/" + entityUID); resp = getResponse(client, uri); TimelineEntity entity = resp.getEntity(TimelineEntity.class); assertNotNull(entity); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/flow-uid/dummy:flow/runs"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/dummy:flowrun"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); // Run Id is not a numerical value. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/some:dummy:flow:123v456"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/dummy:flowrun/apps"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/dummy:app"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/dummy:app/entities/type1"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/entity-uid/dummy:entity"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); } finally { @@ -883,7 +797,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { String appUIDWithFlowInfo = "cluster1!user1!flow_name!1002345678919!application_1111111111_1111"; - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+ + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/"+ "timeline/app-uid/" + appUIDWithFlowInfo); ClientResponse resp = getResponse(client, uri); TimelineEntity appEntity1 = resp.getEntity(TimelineEntity.class); @@ -892,8 +806,9 @@ public class TestTimelineReaderWebServicesHBaseStorage { TimelineEntityType.YARN_APPLICATION.toString(), appEntity1.getType()); assertEquals("application_1111111111_1111", appEntity1.getId()); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + - "app-uid/" + appUIDWithFlowInfo + "/entities/type1"); + uri = + URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + + "app-uid/" + appUIDWithFlowInfo + "/entities/type1"); resp = getResponse(client, uri); Set entities1 = resp.getEntity(new GenericType>(){}); @@ -910,8 +825,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { } String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111"; - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ - "app-uid/" + appUIDWithoutFlowInfo); + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "app-uid/" + appUIDWithoutFlowInfo); resp = getResponse(client, uri); TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class); assertNotNull(appEntity2); @@ -919,8 +834,9 @@ public class TestTimelineReaderWebServicesHBaseStorage { TimelineEntityType.YARN_APPLICATION.toString(), appEntity2.getType()); assertEquals("application_1111111111_1111", appEntity2.getId()); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + - "app-uid/" + appUIDWithoutFlowInfo + "/entities/type1"); + uri = + URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + + "app-uid/" + appUIDWithoutFlowInfo + "/entities/type1"); resp = getResponse(client, uri); Set entities2 = resp.getEntity(new GenericType>(){}); @@ -937,8 +853,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { } String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!0!entity1"; - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ - "entity-uid/" + entityUIDWithFlowInfo); + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "entity-uid/" + entityUIDWithFlowInfo); resp = getResponse(client, uri); TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class); assertNotNull(singleEntity1); @@ -947,8 +863,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { String entityUIDWithoutFlowInfo = appUIDWithoutFlowInfo + "!type1!0!entity1"; - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ - "entity-uid/" + entityUIDWithoutFlowInfo); + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "entity-uid/" + entityUIDWithoutFlowInfo); resp = getResponse(client, uri); TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class); assertNotNull(singleEntity2); @@ -965,7 +881,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { String appUID = "cluster1!user*1!flow_name!1002345678919!application_1111111111_1111"; - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+ + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/"+ "timeline/app-uid/" + appUID); verifyHttpResponse(client, uri, Status.BAD_REQUEST); } finally { @@ -977,19 +893,19 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlows() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows"); verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, new String[] {"flow1", "flow_name", "flow_name2"}); // Query without specifying cluster ID. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/flows/"); verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, new String[] {"flow1", "flow_name", "flow_name2"}); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?limit=1"); verifyFlowEntites(client, uri, 1, new int[] {3}, new String[] {"flow1"}); @@ -998,43 +914,43 @@ public class TestTimelineReaderWebServicesHBaseStorage { HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=" + fmt.format(firstFlowActivity) + "-" + fmt.format(dayTs)); verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, new String[] {"flow1", "flow_name", "flow_name2"}); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs + (4*86400000L))); verifyFlowEntites(client, uri, 0, new int[] {}, new String[] {}); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=-" + fmt.format(dayTs)); verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, new String[] {"flow1", "flow_name", "flow_name2"}); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=" + fmt.format(firstFlowActivity) + "-"); verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, new String[] {"flow1", "flow_name", "flow_name2"}); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150711:20150714"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150714-20150711"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=2015071129-20150712"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150711-2015071243"); verifyHttpResponse(client, uri, Status.BAD_REQUEST); } finally { @@ -1042,11 +958,48 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } + @Test + public void testGetFlowsForPagination() throws Exception { + Client client = createClient(); + int noOfEntities = 3; + int limit = 2; + try { + String flowURI = "http://localhost:" + getServerPort() + "/ws/v2/" + + "timeline/clusters/cluster1/flows"; + URI uri = URI.create(flowURI); + List flowEntites = + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity3 = flowEntites.get(noOfEntities - 1); + + uri = URI.create(flowURI + "?limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity2 = flowEntites.get(limit - 1); + + uri = URI + .create(flowURI + "?limit=" + limit + "&fromid=" + + fEntity2.getInfo().get(TimelineReaderUtils.FROMID_KEY)); + flowEntites = verifyFlowEntites(client, uri, noOfEntities - limit + 1); + assertEquals(fEntity2, flowEntites.get(0)); + assertEquals(fEntity3, flowEntites.get(noOfEntities - limit)); + + uri = URI + .create(flowURI + "?limit=" + limit + "&fromid=" + + fEntity3.getInfo().get(TimelineReaderUtils.FROMID_KEY)); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity3, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + @Test public void testGetApp() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111?" + "userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919"); ClientResponse resp = getResponse(client, uri); @@ -1064,7 +1017,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertTrue(verifyMetrics(metric, m1, m2, m3)); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/apps/application_1111111111_2222?userid=user1" + "&fields=metrics&flowname=flow_name&flowrunid=1002345678919"); resp = getResponse(client, uri); @@ -1086,7 +1039,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetAppWithoutFlowInfo() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111?" + "fields=ALL"); ClientResponse resp = getResponse(client, uri); @@ -1105,7 +1058,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertTrue(verifyMetrics(metric, m1, m2, m3)); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111?" + "fields=ALL&metricslimit=10"); resp = getResponse(client, uri); @@ -1135,7 +1088,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntityWithoutFlowInfo() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity1"); ClientResponse resp = getResponse(client, uri); @@ -1152,7 +1105,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntitiesWithoutFlowInfo() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1"); ClientResponse resp = getResponse(client, uri); @@ -1176,7 +1129,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntitiesDataToRetrieve() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?confstoretrieve=cfg_"); ClientResponse resp = getResponse(client, uri); @@ -1193,7 +1146,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(2, cfgCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?confstoretrieve=cfg_,config_"); resp = getResponse(client, uri); @@ -1210,7 +1163,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(5, cfgCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?confstoretrieve=!(cfg_,config_)"); resp = getResponse(client, uri); @@ -1226,7 +1179,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(1, cfgCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricstoretrieve=MAP_"); resp = getResponse(client, uri); @@ -1242,7 +1195,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(1, metricCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricstoretrieve=MAP1_,HDFS_"); resp = getResponse(client, uri); @@ -1259,7 +1212,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(3, metricCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricstoretrieve=!(MAP1_,HDFS_)"); resp = getResponse(client, uri); @@ -1284,7 +1237,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntitiesConfigFilters() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=config_param1%20eq%20value1%20OR%20" + "config_param1%20eq%20value3"); @@ -1298,7 +1251,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity.getId().equals("entity2")); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=config_param1%20eq%20value1%20AND" + "%20configuration_param2%20eq%20value2"); @@ -1309,7 +1262,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // conffilters=(config_param1 eq value1 AND configuration_param2 eq // value2) OR (config_param1 eq value3 AND cfg_param3 eq value1) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" + "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + @@ -1327,7 +1280,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // conffilters=(config_param1 eq value1 AND configuration_param2 eq // value2) OR (config_param1 eq value3 AND cfg_param3 eq value1) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" + "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + @@ -1343,7 +1296,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(3, cfgCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" + "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + @@ -1369,7 +1322,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // entity1. For ne, both entity1 and entity2 will be returned. For ene, // only entity2 will be returned as we are checking for existence too. // conffilters=configuration_param2 ne value3 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=configuration_param2%20ne%20value3"); resp = getResponse(client, uri); @@ -1381,7 +1334,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity.getId().equals("entity2")); } // conffilters=configuration_param2 ene value3 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=configuration_param2%20ene%20value3"); resp = getResponse(client, uri); @@ -1401,7 +1354,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { // infofilters=info1 eq cluster1 OR info1 eq cluster2 - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info1%20eq%20cluster1%20OR%20info1%20eq" + "%20cluster2"); @@ -1416,7 +1369,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } // infofilters=info1 eq cluster1 AND info4 eq 35000 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info1%20eq%20cluster1%20AND%20info4%20" + "eq%2035000"); @@ -1426,7 +1379,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entities.size()); // infofilters=info4 eq 35000 OR info4 eq 36000 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info4%20eq%2035000%20OR%20info4%20eq" + "%2036000"); @@ -1441,7 +1394,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // infofilters=(info1 eq cluster1 AND info4 eq 35000) OR // (info1 eq cluster2 AND info2 eq 2.0) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" + "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0" + @@ -1460,7 +1413,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // infofilters=(info1 eq cluster1 AND info4 eq 35000) OR // (info1 eq cluster2 AND info2 eq 2.0) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" + "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%20" + @@ -1482,7 +1435,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // both entity1 and entity2 will be returned. For ene, only entity2 will // be returned as we are checking for existence too. // infofilters=info3 ne 39000 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info3%20ne%2039000"); resp = getResponse(client, uri); @@ -1494,7 +1447,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity.getId().equals("entity2")); } // infofilters=info3 ene 39000 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info3%20ene%2039000"); resp = getResponse(client, uri); @@ -1514,7 +1467,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { // metricfilters=HDFS_BYTES_READ lt 60 OR HDFS_BYTES_READ eq 157 - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20OR%20" + "HDFS_BYTES_READ%20eq%20157"); @@ -1529,7 +1482,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } // metricfilters=HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20AND%20" + "MAP_SLOT_MILLIS%20gt%2040"); @@ -1540,7 +1493,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + @@ -1558,7 +1511,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + @@ -1576,7 +1529,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + @@ -1597,7 +1550,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } assertEquals(2, metricCnt); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" + "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + @@ -1630,7 +1583,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // entity1. For ne, both entity1 and entity2 will be returned. For ene, // only entity2 will be returned as we are checking for existence too. // metricfilters=MAP11_SLOT_MILLIS ne 100 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ne%20100"); resp = getResponse(client, uri); @@ -1642,7 +1595,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity.getId().equals("entity2")); } // metricfilters=MAP11_SLOT_MILLIS ene 100 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ene%20100"); resp = getResponse(client, uri); @@ -1661,7 +1614,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntitiesEventFilters() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=event1,event3"); ClientResponse resp = getResponse(client, uri); @@ -1674,7 +1627,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity.getId().equals("entity2")); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=!(event1,event3)"); resp = getResponse(client, uri); @@ -1683,7 +1636,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(0, entities.size()); // eventfilters=!(event1,event3) OR event5,event6 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=!(event1,event3)%20OR%20event5,event6"); resp = getResponse(client, uri); @@ -1696,7 +1649,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { // eventfilters=(!(event1,event3) OR event5,event6) OR // (event1,event2 AND (event3,event4)) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=(!(event1,event3)%20OR%20event5," + "event6)%20OR%20(event1,event2%20AND%20(event3,event4))"); @@ -1717,7 +1670,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntitiesRelationFilters() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?isrelatedto=type3:entity31,type2:entity21:entity22"); ClientResponse resp = getResponse(client, uri); @@ -1730,7 +1683,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity.getId().equals("entity2")); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + "?isrelatedto=!(type3:entity31,type2:entity21:entity22)"); resp = getResponse(client, uri); @@ -1740,7 +1694,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { // isrelatedto=!(type3:entity31,type2:entity21:entity22)OR type5:entity51, // type6:entity61:entity66 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + "?isrelatedto=!(type3:entity31,type2:entity21:entity22)%20OR%20" + "type5:entity51,type6:entity61:entity66"); @@ -1755,7 +1710,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { // isrelatedto=(!(type3:entity31,type2:entity21:entity22)OR type5: // entity51,type6:entity61:entity66) OR (type1:entity14,type2:entity21: // entity22 AND (type3:entity32:entity35,type4:entity42)) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + "?isrelatedto=(!(type3:entity31,type2:entity21:entity22)%20OR%20" + "type5:entity51,type6:entity61:entity66)%20OR%20(type1:entity14," + @@ -1772,7 +1728,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { // relatesto=!(type3:entity31,type2:entity21:entity22)OR type5:entity51, // type6:entity61:entity66 - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + + uri = URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + "?relatesto=!%20(type3:entity31,type2:entity21:entity22%20)%20OR%20" + "type5:entity51,type6:entity61:entity66"); @@ -1787,7 +1744,9 @@ public class TestTimelineReaderWebServicesHBaseStorage { // relatesto=(!(type3:entity31,type2:entity21:entity22)OR type5:entity51, // type6:entity61:entity66) OR (type1:entity14,type2:entity21:entity22 AND // (type3:entity32:entity35 , type4:entity42)) - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + + uri = + URI.create("http://localhost:" + getServerPort() + + "/ws/v2/timeline/" + "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + "?relatesto=(!(%20type3:entity31,type2:entity21:entity22)%20OR%20" + "type5:entity51,type6:entity61:entity66%20)%20OR%20(type1:entity14," + @@ -1813,7 +1772,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetEntityDataToRetrieve() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?confstoretrieve=cfg_,configuration_"); ClientResponse resp = getResponse(client, uri); @@ -1827,7 +1786,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { configKey.startsWith("cfg_")); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?confstoretrieve=!(cfg_,configuration_)"); resp = getResponse(client, uri); @@ -1840,7 +1799,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertTrue(configKey.startsWith("config_")); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?metricstoretrieve=MAP1_,HDFS_"); resp = getResponse(client, uri); @@ -1854,7 +1813,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { metric.getId().startsWith("HDFS_")); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)"); resp = getResponse(client, uri); @@ -1869,7 +1828,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(1, metric.getValues().size()); } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)&" + "metricslimit=5"); @@ -1892,7 +1851,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowRunApps() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919/apps?fields=ALL"); ClientResponse resp = getResponse(client, uri); @@ -1912,7 +1871,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919/apps?fields=ALL&metricslimit=2"); resp = getResponse(client, uri); @@ -1932,14 +1891,14 @@ public class TestTimelineReaderWebServicesHBaseStorage { } // Query without specifying cluster ID. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/runs/1002345678919/apps"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); assertNotNull(entities); assertEquals(2, entities.size()); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/runs/1002345678919/" + "apps?limit=1"); resp = getResponse(client, uri); @@ -1955,7 +1914,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowApps() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "fields=ALL"); ClientResponse resp = getResponse(client, uri); @@ -1994,7 +1953,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "fields=ALL&metricslimit=6"); resp = getResponse(client, uri); @@ -2038,14 +1997,14 @@ public class TestTimelineReaderWebServicesHBaseStorage { } // Query without specifying cluster ID. - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/apps"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); assertNotNull(entities); assertEquals(3, entities.size()); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/apps?limit=1"); resp = getResponse(client, uri); entities = resp.getEntity(new GenericType>(){}); @@ -2061,7 +2020,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Client client = createClient(); try { String entityType = TimelineEntityType.YARN_APPLICATION.toString(); - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); ClientResponse resp = getResponse(client, uri); @@ -2072,7 +2031,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertTrue("Unexpected app in result", entities.contains( newEntity(entityType, "application_1111111111_1111"))); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "metricfilters=HDFS_BYTES_READ%20ge%200"); resp = getResponse(client, uri); @@ -2082,7 +2041,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertTrue("Unexpected app in result", entities.contains( newEntity(entityType, "application_1111111111_1111"))); - uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "conffilters=cfg1%20eq%20value1"); resp = getResponse(client, uri); @@ -2100,7 +2059,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowRunNotPresent() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678929"); verifyHttpResponse(client, uri, Status.NOT_FOUND); @@ -2113,7 +2072,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowsNotPresent() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster2/flows"); ClientResponse resp = getResponse(client, uri); Set entities = @@ -2130,7 +2089,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetAppNotPresent() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1378"); verifyHttpResponse(client, uri, Status.NOT_FOUND); } finally { @@ -2142,7 +2101,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowRunAppsNotPresent() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster2/users/user1/flows/flow_name/runs/" + "1002345678919/apps"); ClientResponse resp = getResponse(client, uri); @@ -2160,7 +2119,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { public void testGetFlowAppsNotPresent() throws Exception { Client client = createClient(); try { - URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + + URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster2/users/user1/flows/flow_name55/apps"); ClientResponse resp = getResponse(client, uri); Set entities = @@ -2173,21 +2132,13 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } - @After - public void stop() throws Exception { - if (server != null) { - server.stop(); - server = null; - } - } - @Test public void testGenericEntitiesForPagination() throws Exception { Client client = createClient(); try { int limit = 10; String queryParam = "?limit=" + limit; - String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/entitytype"; URI uri = URI.create(resourceUri + queryParam); @@ -2251,7 +2202,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { return entity; } - private void verifyFlowEntites(Client client, URI uri, int noOfEntities, + private List verifyFlowEntites(Client client, URI uri, + int noOfEntities, int[] a, String[] flowsInSequence) throws Exception { ClientResponse resp = getResponse(client, uri); List entities = @@ -2267,6 +2219,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { timelineEntity.getInfo().get("SYSTEM_INFO_FLOW_NAME")); assertEquals(a[count++], timelineEntity.getFlowRuns().size()); } + return entities; } @Test @@ -2275,7 +2228,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { // app entities stored is 15 during initialization. int totalAppEntities = 15; - String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow1/apps"; URI uri = URI.create(resourceUri); ClientResponse resp = getResponse(client, uri); @@ -2319,7 +2272,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { // app entities stored is 15 during initialization. int totalAppEntities = 5; - String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps"; URI uri = URI.create(resourceUri); ClientResponse resp = getResponse(client, uri); @@ -2363,7 +2316,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { // app entities stored is 15 during initialization. int totalRuns = 3; - String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow1/runs"; URI uri = URI.create(resourceUri); ClientResponse resp = getResponse(client, uri); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverterToString.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverterToString.java new file mode 100644 index 00000000000..1f52a7b6f89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverterToString.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * Interface which has to be implemented for encoding and decoding row keys or + * column qualifiers as string. + */ +public interface KeyConverterToString { + /** + * Encode key as string. + * @param key of type T to be encoded as string. + * @return encoded value as string. + */ + String encodeAsString(T key); + + /** + * Decode row key from string to a key of type T. + * @param encodedKey string representation of row key + * @return type T which has been constructed after decoding string. + */ + T decodeFromString(String encodedKey); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index bb77e36d4df..b8a5dba6cd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import java.util.List; + import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; /** @@ -32,8 +36,8 @@ public class FlowActivityRowKey { private final Long dayTs; private final String userId; private final String flowName; - private final KeyConverter flowActivityRowKeyConverter = - new FlowActivityRowKeyConverter(); + private final FlowActivityRowKeyConverter + flowActivityRowKeyConverter = new FlowActivityRowKeyConverter(); /** * @param clusterId identifying the cluster @@ -103,14 +107,33 @@ public class FlowActivityRowKey { return new FlowActivityRowKeyConverter().decode(rowKey); } + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowName}. + * @return String representation of row key + */ + public String getRowKeyAsString() { + return flowActivityRowKeyConverter.encodeAsString(this); + } + + /** + * Given the raw row key as string, returns the row key as an object. + * @param encodedRowKey String representation of row key. + * @return A FlowActivityRowKey object. + */ + public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) { + return new FlowActivityRowKeyConverter().decodeFromString(encodedRowKey); + } + /** * Encodes and decodes row key for flow activity table. The row key is of the * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day * timestamp) is a long and rest are strings. *

*/ - final private static class FlowActivityRowKeyConverter implements - KeyConverter { + final private static class FlowActivityRowKeyConverter + implements KeyConverter, + KeyConverterToString { private FlowActivityRowKeyConverter() { } @@ -192,5 +215,33 @@ public class FlowActivityRowKey { Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); return new FlowActivityRowKey(clusterId, dayTs, userId, flowName); } + + @Override + public String encodeAsString(FlowActivityRowKey key) { + if (key.getDayTimestamp() == null) { + return TimelineReaderUtils + .joinAndEscapeStrings(new String[] {key.clusterId}); + } else if (key.getUserId() == null) { + return TimelineReaderUtils.joinAndEscapeStrings( + new String[] {key.clusterId, key.dayTs.toString()}); + } else if (key.getFlowName() == null) { + return TimelineReaderUtils.joinAndEscapeStrings( + new String[] {key.clusterId, key.dayTs.toString(), key.userId}); + } + return TimelineReaderUtils.joinAndEscapeStrings(new String[] { + key.clusterId, key.dayTs.toString(), key.userId, key.flowName}); + } + + @Override + public FlowActivityRowKey decodeFromString(String encodedRowKey) { + List split = TimelineReaderUtils.split(encodedRowKey); + if (split == null || split.size() != 4) { + throw new IllegalArgumentException( + "Invalid row key for flow activity."); + } + Long dayTs = Long.valueOf(split.get(1)); + return new FlowActivityRowKey(split.get(0), dayTs, split.get(2), + split.get(3)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index c741d0ed4b0..a1cdb29b124 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityCo import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.webapp.BadRequestException; import com.google.common.base.Preconditions; @@ -110,11 +112,30 @@ class FlowActivityEntityReader extends TimelineEntityReader { Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); String clusterId = getContext().getClusterId(); - if (getFilters().getCreatedTimeBegin() == 0L && - getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { + if (getFilters().getFromId() == null + && getFilters().getCreatedTimeBegin() == 0L + && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { // All records have to be chosen. scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) .getRowKeyPrefix()); + } else if (getFilters().getFromId() != null) { + FlowActivityRowKey key = null; + try { + key = + FlowActivityRowKey.parseRowKeyFromString(getFilters().getFromId()); + } catch (IllegalArgumentException e) { + throw new BadRequestException("Invalid filter fromid is provided."); + } + if (!clusterId.equals(key.getClusterId())) { + throw new BadRequestException( + "fromid doesn't belong to clusterId=" + clusterId); + } + scan.setStartRow(key.getRowKey()); + scan.setStopRow( + new FlowActivityRowKeyPrefix(clusterId, + (getFilters().getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))) + .getRowKeyPrefix()); } else { scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() .getCreatedTimeEnd()).getRowKeyPrefix()); @@ -157,7 +178,8 @@ class FlowActivityEntityReader extends TimelineEntityReader { flowRun.setId(flowRun.getId()); flowActivity.addFlowRun(flowRun); } - + flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY, + rowKey.getRowKeyAsString()); return flowActivity; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index cbd2273dc26..bac5f857cef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; @@ -224,6 +225,26 @@ public class TestRowKeys { verifyRowPrefixBytes(byteRowKeyPrefix); } + @Test + public void testFlowActivityRowKeyAsString() { + String cluster = "cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster" + + TimelineReaderUtils.DEFAULT_ESCAPE_CHAR; + String user = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user"; + String fName = "dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + + TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow" + + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR; + Long ts = 1459900830000L; + Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); + String rowKeyAsString = + new FlowActivityRowKey(cluster, ts, user, fName).getRowKeyAsString(); + FlowActivityRowKey rowKey = + FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString); + assertEquals(cluster, rowKey.getClusterId()); + assertEquals(dayTimestamp, rowKey.getDayTimestamp()); + assertEquals(user, rowKey.getUserId()); + assertEquals(fName, rowKey.getFlowName()); + } + @Test public void testFlowRunRowKey() { byte[] byteRowKey = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index b45fd36ba81..cf0fa50aef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -175,7 +175,7 @@ public class TimelineReaderServer extends CompositeService { } @VisibleForTesting - int getWebServerPort() { + public int getWebServerPort() { return readerWebServer.getConnectorAddress(0).getPort(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java index c93c631b364..8f9243359ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java @@ -24,13 +24,29 @@ import java.util.List; import org.apache.commons.lang.StringUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Set of utility methods to be used across timeline reader. */ -final class TimelineReaderUtils { +public final class TimelineReaderUtils { private TimelineReaderUtils() { } + /** + * Default delimiter for joining strings. + */ + @VisibleForTesting + public static final char DEFAULT_DELIMITER_CHAR = '!'; + + /** + * Default escape character used for joining strings. + */ + @VisibleForTesting + public static final char DEFAULT_ESCAPE_CHAR = '*'; + + public static final String FROMID_KEY = "FROM_ID"; + /** * Split the passed string along the passed delimiter character while looking * for escape char to interpret the splitted parts correctly. For delimiter or @@ -168,4 +184,14 @@ final class TimelineReaderUtils { // Join the strings after they have been escaped. return StringUtils.join(strs, delimiterChar); } + + public static List split(final String str) + throws IllegalArgumentException { + return split(str, DEFAULT_DELIMITER_CHAR, DEFAULT_ESCAPE_CHAR); + } + + public static String joinAndEscapeStrings(final String[] strs) { + return joinAndEscapeStrings(strs, DEFAULT_DELIMITER_CHAR, + DEFAULT_ESCAPE_CHAR); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index df3ccabc9a7..1f82d912f77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1333,6 +1333,10 @@ public class TimelineReaderWebServices { * 2 dates. * "daterange=20150711-" returns flows active on and after 20150711. * "daterange=-20150711" returns flows active on and before 20150711. + * @param fromId If specified, retrieve the next set of flows from the given + * fromId. The set of flows retrieved is inclusive of specified fromId. + * fromId should be taken from the value associated with FROM_ID info key + * in flow entity response which was sent earlier. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowActivityEntity instances are returned.
@@ -1349,8 +1353,9 @@ public class TimelineReaderWebServices { @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { - return getFlows(req, res, null, limit, dateRange); + @QueryParam("daterange") String dateRange, + @QueryParam("fromid") String fromId) { + return getFlows(req, res, null, limit, dateRange, fromId); } /** @@ -1379,6 +1384,10 @@ public class TimelineReaderWebServices { * 2 dates. * "daterange=20150711-" returns flows active on and after 20150711. * "daterange=-20150711" returns flows active on and before 20150711. + * @param fromId If specified, retrieve the next set of flows from the given + * fromId. The set of flows retrieved is inclusive of specified fromId. + * fromId should be taken from the value associated with FROM_ID info key + * in flow entity response which was sent earlier. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowActivityEntity instances are returned.
@@ -1396,7 +1405,8 @@ public class TimelineReaderWebServices { @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { + @QueryParam("daterange") String dateRange, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1413,7 +1423,7 @@ public class TimelineReaderWebServices { TimelineEntityFilters entityFilters = TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, null, null, null, null, null, null, null, null, null, - null); + fromId); entityFilters.setCreatedTimeBegin(range.dateStart); entityFilters.setCreatedTimeEnd(range.dateEnd); entities = timelineReaderManager.getEntities( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java index 780cfd063d2..52e24e10a17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java @@ -195,39 +195,29 @@ enum TimelineUIDConverter { }; /** - * Delimiter used for UID. - */ - public static final char UID_DELIMITER_CHAR = '!'; - - /** - * Escape Character used if delimiter or escape character itself is part of - * different components of UID. - */ - public static final char UID_ESCAPE_CHAR = '*'; - - /** - * Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}. + * Split UID using {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} and + * {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR}. * @param uid UID to be splitted. * @return a list of different parts of UID split across delimiter. * @throws IllegalArgumentException if UID is not properly escaped. */ private static List splitUID(String uid) throws IllegalArgumentException { - return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR); + return TimelineReaderUtils.split(uid); } /** - * Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with - * delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if - * UID parts contain them. + * Join different parts of UID delimited by + * {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} with delimiter and + * escape character escaped using + * {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR} if UID parts contain them. * @param parts an array of UID parts to be joined. * @return a string joined using the delimiter with escape and delimiter - * characters escaped if they are part of the string parts to be joined. - * Returns null if one of the parts is null. + * characters escaped if they are part of the string parts to be + * joined. Returns null if one of the parts is null. */ private static String joinAndEscapeUIDParts(String[] parts) { - return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR, - UID_ESCAPE_CHAR); + return TimelineReaderUtils.joinAndEscapeStrings(parts); } /**