From ccaa99c9235e0c6922e56f93254583eca49a12e1 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Thu, 11 Jul 2019 13:49:42 +0900 Subject: [PATCH] HADOOP-16381. The JSON License is included in binary tarball via azure-documentdb:1.16.2. Contributed by Sushil Ks. --- .../pom.xml | 40 +++- .../documentstore/DocumentStoreUtils.java | 27 ++- .../document/entity/TimelineEventSubDoc.java | 7 +- .../document/entity/TimelineMetricSubDoc.java | 6 - .../cosmosdb/CosmosDBDocumentStoreReader.java | 115 ++++++---- .../cosmosdb/CosmosDBDocumentStoreWriter.java | 203 +++++++++++------- .../TestCosmosDBDocumentStoreReader.java | 67 ++++++ .../MockedCosmosDBDocumentStoreWriter.java | 47 ++++ .../TestCosmosDBDocumentStoreWriter.java | 96 +++++++++ 9 files changed, 459 insertions(+), 149 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml index 4a6cf7ab15e..18ccb243541 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml @@ -28,7 +28,8 @@ ${project.parent.parent.basedir} - 1.16.2 + 2.4.5 + 1.3.8 @@ -44,11 +45,6 @@ test-jar test - - com.microsoft.azure - azure-documentdb - ${azure.documentdb.version} - junit junit @@ -84,6 +80,38 @@ + + io.reactivex + rxjava + ${rxjava.version} + + + com.microsoft.azure + azure-cosmosdb + ${azure.documentdb.version} + + + io.netty + netty-handler + + + io.netty + netty-codec-http + + + io.reactivex + rxjava + + + io.netty + netty-transport + + + io.netty + netty-handler-proxy + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java index 4b14d47c6a3..6833502f83f 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java @@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.server.timelineservice.documentstore; -import com.microsoft.azure.documentdb.ConnectionPolicy; -import com.microsoft.azure.documentdb.ConsistencyLevel; -import com.microsoft.azure.documentdb.DocumentClient; +import com.microsoft.azure.cosmosdb.ConnectionPolicy; +import com.microsoft.azure.cosmosdb.ConsistencyLevel; +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; @@ -134,6 +134,10 @@ public final class DocumentStoreUtils { * @return false if any of the string is null or empty else true */ public static boolean isNullOrEmpty(String...values) { + if (values == null || values.length == 0) { + return true; + } + for (String value : values) { if (value == null || value.isEmpty()) { return true; @@ -143,15 +147,20 @@ public final class DocumentStoreUtils { } /** - * Creates CosmosDB Document Client. + * Creates CosmosDB Async Document Client. * @param conf * to retrieve cosmos db endpoint and key * @return async document client for CosmosDB */ - public static DocumentClient createCosmosDBClient(Configuration conf){ - return new DocumentClient(DocumentStoreUtils.getCosmosDBEndpoint(conf), - DocumentStoreUtils.getCosmosDBMasterKey(conf), - ConnectionPolicy.GetDefault(), ConsistencyLevel.Session); + public static AsyncDocumentClient createCosmosDBAsyncClient( + Configuration conf){ + return new AsyncDocumentClient.Builder() + .withServiceEndpoint(DocumentStoreUtils.getCosmosDBEndpoint(conf)) + .withMasterKeyOrResourceToken( + DocumentStoreUtils.getCosmosDBMasterKey(conf)) + .withConnectionPolicy(ConnectionPolicy.GetDefault()) + .withConsistencyLevel(ConsistencyLevel.Session) + .build(); } /** @@ -486,4 +495,4 @@ public final class DocumentStoreUtils { return false; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java index cce64ab5f1e..6a46d08764e 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java @@ -30,7 +30,6 @@ import java.util.Map; public class TimelineEventSubDoc { private final TimelineEvent timelineEvent; - private boolean valid; public TimelineEventSubDoc() { timelineEvent = new TimelineEvent(); @@ -51,11 +50,7 @@ public class TimelineEventSubDoc { public boolean isValid() { return timelineEvent.isValid(); } - - public void setValid(boolean valid) { - this.valid = valid; - } - + public long getTimestamp() { return timelineEvent.getTimestamp(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java index f7d078f6890..c0a569baa12 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java @@ -31,7 +31,6 @@ import java.util.TreeMap; public class TimelineMetricSubDoc { private final TimelineMetric timelineMetric; - private boolean valid; private long singleDataTimestamp; private Number singleDataValue = 0; @@ -41,7 +40,6 @@ public class TimelineMetricSubDoc { public TimelineMetricSubDoc(TimelineMetric timelineMetric) { this.timelineMetric = timelineMetric; - this.valid = timelineMetric.isValid(); if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE && timelineMetric.getValues().size() > 0) { this.singleDataTimestamp = timelineMetric.getSingleDataTimestamp(); @@ -130,10 +128,6 @@ public class TimelineMetricSubDoc { timelineMetric.setType(metricType); } - public void setValid(boolean valid) { - this.valid = valid; - } - public boolean isValid() { return (timelineMetric.getId() != null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java index 834ebd55aee..1df89dee8b8 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java @@ -18,9 +18,11 @@ package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb; -import com.microsoft.azure.documentdb.Document; -import com.microsoft.azure.documentdb.DocumentClient; -import com.microsoft.azure.documentdb.FeedOptions; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.microsoft.azure.cosmosdb.FeedOptions; +import com.microsoft.azure.cosmosdb.FeedResponse; +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils; @@ -30,12 +32,14 @@ import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentS import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.Scheduler; +import rx.schedulers.Schedulers; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** @@ -49,7 +53,7 @@ public class CosmosDBDocumentStoreReader .getLogger(CosmosDBDocumentStoreReader.class); private static final int DEFAULT_DOCUMENTS_SIZE = 1; - private static volatile DocumentClient client; + private static AsyncDocumentClient client; private final String databaseName; private final static String COLLECTION_LINK = "/dbs/%s/colls/%s"; private final static String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * " + @@ -66,17 +70,24 @@ public class CosmosDBDocumentStoreReader "\"%s\") "; private final static String ORDER_BY_CLAUSE = " ORDER BY c.createdTime"; + // creating thread pool of size, half of the total available threads from JVM + private static ExecutorService executorService = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors() / 2); + private static Scheduler schedulerForBlockingWork = + Schedulers.from(executorService); + public CosmosDBDocumentStoreReader(Configuration conf) { LOG.info("Initializing Cosmos DB DocumentStoreReader..."); databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf); - // making CosmosDB Client Singleton + initCosmosDBClient(conf); + } + + private synchronized void initCosmosDBClient(Configuration conf) { + // making CosmosDB Async Client Singleton if (client == null) { - synchronized (this) { - if (client == null) { - LOG.info("Creating Cosmos DB Client..."); - client = DocumentStoreUtils.createCosmosDBClient(conf); - } - } + LOG.info("Creating Cosmos DB Reader Async Client..."); + client = DocumentStoreUtils.createCosmosDBAsyncClient(conf); + addShutdownHook(); } } @@ -104,15 +115,16 @@ public class CosmosDBDocumentStoreReader LOG.debug("Querying Collection : {} , with query {}", collectionName, sqlQuery); - Set entityTypes = new HashSet<>(); - Iterator documentIterator = client.queryDocuments( + return Sets.newHashSet(client.queryDocuments( String.format(COLLECTION_LINK, databaseName, collectionName), - sqlQuery, null).getQueryIterator(); - while (documentIterator.hasNext()) { - Document document = documentIterator.next(); - entityTypes.add(document.getString(ENTITY_TYPE_COLUMN)); - } - return entityTypes; + sqlQuery, new FeedOptions()) + .map(FeedResponse::getResults) // Map the page to the list of documents + .concatMap(Observable::from) + .map(document -> String.valueOf(document.get(ENTITY_TYPE_COLUMN))) + .toList() + .subscribeOn(schedulerForBlockingWork) + .toBlocking() + .single()); } @Override @@ -133,25 +145,25 @@ public class CosmosDBDocumentStoreReader final long maxDocumentsSize) { final String sqlQuery = buildQueryWithPredicates(context, collectionName, maxDocumentsSize); - List timelineDocs = new ArrayList<>(); LOG.debug("Querying Collection : {} , with query {}", collectionName, sqlQuery); - FeedOptions feedOptions = new FeedOptions(); - feedOptions.setPageSize((int) maxDocumentsSize); - Iterator documentIterator = client.queryDocuments( - String.format(COLLECTION_LINK, databaseName, collectionName), - sqlQuery, feedOptions).getQueryIterator(); - while (documentIterator.hasNext()) { - Document document = documentIterator.next(); - TimelineDoc resultDoc = document.toObject(docClass); - if (resultDoc.getCreatedTime() == 0 && - document.getTimestamp() != null) { - resultDoc.setCreatedTime(document.getTimestamp().getTime()); - } - timelineDocs.add(resultDoc); - } - return timelineDocs; + return client.queryDocuments(String.format(COLLECTION_LINK, + databaseName, collectionName), sqlQuery, new FeedOptions()) + .map(FeedResponse::getResults) // Map the page to the list of documents + .concatMap(Observable::from) + .map(document -> { + TimelineDoc resultDoc = document.toObject(docClass); + if (resultDoc.getCreatedTime() == 0 && + document.getTimestamp() != null) { + resultDoc.setCreatedTime(document.getTimestamp().getTime()); + } + return resultDoc; + }) + .toList() + .subscribeOn(schedulerForBlockingWork) + .toBlocking() + .single(); } private String buildQueryWithPredicates(TimelineReaderContext context, @@ -168,33 +180,34 @@ public class CosmosDBDocumentStoreReader return addPredicates(context, collectionName, queryStrBuilder); } - private String addPredicates(TimelineReaderContext context, + @VisibleForTesting + String addPredicates(TimelineReaderContext context, String collectionName, StringBuilder queryStrBuilder) { boolean hasPredicate = false; queryStrBuilder.append(WHERE_CLAUSE); - if (context.getClusterId() != null) { + if (!DocumentStoreUtils.isNullOrEmpty(context.getClusterId())) { hasPredicate = true; queryStrBuilder.append(String.format(CONTAINS_FUNC_FOR_ID, context.getClusterId())); } - if (context.getUserId() != null) { + if (!DocumentStoreUtils.isNullOrEmpty(context.getUserId())) { hasPredicate = true; queryStrBuilder.append(AND_OPERATOR) .append(String.format(CONTAINS_FUNC_FOR_ID, context.getUserId())); } - if (context.getFlowName() != null) { + if (!DocumentStoreUtils.isNullOrEmpty(context.getFlowName())) { hasPredicate = true; queryStrBuilder.append(AND_OPERATOR) .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowName())); } - if (context.getAppId() != null) { + if (!DocumentStoreUtils.isNullOrEmpty(context.getAppId())) { hasPredicate = true; queryStrBuilder.append(AND_OPERATOR) .append(String.format(CONTAINS_FUNC_FOR_ID, context.getAppId())); } - if (context.getEntityId() != null) { + if (!DocumentStoreUtils.isNullOrEmpty(context.getEntityId())) { hasPredicate = true; queryStrBuilder.append(AND_OPERATOR) .append(String.format(CONTAINS_FUNC_FOR_ID, context.getEntityId())); @@ -204,7 +217,7 @@ public class CosmosDBDocumentStoreReader queryStrBuilder.append(AND_OPERATOR) .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowRunId())); } - if (context.getEntityType() != null){ + if (!DocumentStoreUtils.isNullOrEmpty(context.getEntityType())){ hasPredicate = true; queryStrBuilder.append(AND_OPERATOR) .append(String.format(CONTAINS_FUNC_FOR_TYPE, @@ -224,9 +237,17 @@ public class CosmosDBDocumentStoreReader @Override public synchronized void close() { if (client != null) { - LOG.info("Closing Cosmos DB Client..."); + LOG.info("Closing Cosmos DB Reader Async Client..."); client.close(); client = null; } } -} \ No newline at end of file + + private void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (executorService != null) { + executorService.shutdown(); + } + })); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java index 16ed207be9e..32965840357 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java @@ -19,14 +19,20 @@ package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb; -import com.microsoft.azure.documentdb.AccessCondition; -import com.microsoft.azure.documentdb.AccessConditionType; -import com.microsoft.azure.documentdb.Database; -import com.microsoft.azure.documentdb.Document; -import com.microsoft.azure.documentdb.DocumentClient; -import com.microsoft.azure.documentdb.DocumentClientException; -import com.microsoft.azure.documentdb.DocumentCollection; -import com.microsoft.azure.documentdb.RequestOptions; +import com.google.common.annotations.VisibleForTesting; +import com.microsoft.azure.cosmosdb.AccessCondition; +import com.microsoft.azure.cosmosdb.AccessConditionType; +import com.microsoft.azure.cosmosdb.Database; +import com.microsoft.azure.cosmosdb.Document; +import com.microsoft.azure.cosmosdb.DocumentClientException; +import com.microsoft.azure.cosmosdb.DocumentCollection; +import com.microsoft.azure.cosmosdb.FeedResponse; +import com.microsoft.azure.cosmosdb.RequestOptions; +import com.microsoft.azure.cosmosdb.ResourceResponse; +import com.microsoft.azure.cosmosdb.SqlParameter; +import com.microsoft.azure.cosmosdb.SqlParameterCollection; +import com.microsoft.azure.cosmosdb.SqlQuerySpec; +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics; @@ -40,6 +46,13 @@ import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentS import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.Scheduler; +import rx.functions.Func1; +import rx.schedulers.Schedulers; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * This is the Document Store Writer implementation for @@ -51,79 +64,102 @@ public class CosmosDBDocumentStoreWriter private static final Logger LOG = LoggerFactory .getLogger(CosmosDBDocumentStoreWriter.class); - private static volatile DocumentClient client; private final String databaseName; private static final PerNodeAggTimelineCollectorMetrics METRICS = PerNodeAggTimelineCollectorMetrics.getInstance(); + + private static AsyncDocumentClient client; + // creating thread pool of size equal to number of collection types + private ExecutorService executorService = + Executors.newFixedThreadPool(CollectionType.values().length); + private Scheduler schedulerForBlockingWork = + Schedulers.from(executorService); + private static final String DATABASE_LINK = "/dbs/%s"; private static final String COLLECTION_LINK = DATABASE_LINK + "/colls/%s"; private static final String DOCUMENT_LINK = COLLECTION_LINK + "/docs/%s"; + private static final String ID = "@id"; + private static final String QUERY_COLLECTION_IF_EXISTS = "SELECT * FROM r " + + "where r.id = " + ID; public CosmosDBDocumentStoreWriter(Configuration conf) { LOG.info("Initializing Cosmos DB DocumentStoreWriter..."); databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf); - // making CosmosDB Client Singleton + initCosmosDBClient(conf); + } + + private synchronized void initCosmosDBClient(Configuration conf) { + // making CosmosDB Async Client Singleton if (client == null) { - synchronized (this) { - if (client == null) { - LOG.info("Creating Cosmos DB Client..."); - client = DocumentStoreUtils.createCosmosDBClient(conf); - } - } + LOG.info("Creating Cosmos DB Writer Async Client..."); + client = DocumentStoreUtils.createCosmosDBAsyncClient(conf); + addShutdownHook(); } } @Override public void createDatabase() { - try { - client.readDatabase(String.format( - DATABASE_LINK, databaseName), new RequestOptions()); - LOG.info("Database {} already exists.", databaseName); - } catch (DocumentClientException docExceptionOnRead) { - if (docExceptionOnRead.getStatusCode() == 404) { - LOG.info("Creating new Database : {}", databaseName); - Database databaseDefinition = new Database(); - databaseDefinition.setId(databaseName); - try { - client.createDatabase(databaseDefinition, new RequestOptions()); - } catch (DocumentClientException docExceptionOnCreate) { - LOG.error("Unable to create new Database : {}", databaseName, - docExceptionOnCreate); - } - } else { - LOG.error("Error while reading Database : {}", databaseName, - docExceptionOnRead); - } - } + Observable> databaseReadObs = + client.readDatabase(String.format(DATABASE_LINK, databaseName), null); + + Observable> databaseExistenceObs = + databaseReadObs + .doOnNext(databaseResourceResponse -> + LOG.info("Database {} already exists.", databaseName)) + .onErrorResumeNext(throwable -> { + // if the database doesn't exists + // readDatabase() will result in 404 error + if (throwable instanceof DocumentClientException) { + DocumentClientException de = + (DocumentClientException) throwable; + if (de.getStatusCode() == 404) { + // if the database doesn't exist, create it. + LOG.info("Creating new Database : {}", databaseName); + + Database dbDefinition = new Database(); + dbDefinition.setId(databaseName); + + return client.createDatabase(dbDefinition, null); + } + } + // some unexpected failure in reading database happened. + // pass the error up. + LOG.error("Reading database : {} if it exists failed.", + databaseName, throwable); + return Observable.error(throwable); + }); + // wait for completion + databaseExistenceObs.toCompletable().await(); } @Override public void createCollection(final String collectionName) { LOG.info("Creating Timeline Collection : {} for Database : {}", collectionName, databaseName); - try { - client.readCollection(String.format(COLLECTION_LINK, databaseName, - collectionName), new RequestOptions()); - LOG.info("Collection {} already exists.", collectionName); - } catch (DocumentClientException docExceptionOnRead) { - if (docExceptionOnRead.getStatusCode() == 404) { - DocumentCollection collection = new DocumentCollection(); - collection.setId(collectionName); - LOG.info("Creating collection {} under Database {}", - collectionName, databaseName); - try { - client.createCollection( - String.format(DATABASE_LINK, databaseName), - collection, new RequestOptions()); - } catch (DocumentClientException docExceptionOnCreate) { - LOG.error("Unable to create Collection : {} under Database : {}", - collectionName, databaseName, docExceptionOnCreate); - } - } else { - LOG.error("Error while reading Collection : {} under Database : {}", - collectionName, databaseName, docExceptionOnRead); - } - } + client.queryCollections(String.format(DATABASE_LINK, databaseName), + new SqlQuerySpec(QUERY_COLLECTION_IF_EXISTS, + new SqlParameterCollection( + new SqlParameter(ID, collectionName))), null) + .single() // there should be single page of result + .flatMap((Func1, Observable>) + page -> { + if (page.getResults().isEmpty()) { + // if there is no matching collection create one. + DocumentCollection collection = new DocumentCollection(); + collection.setId(collectionName); + LOG.info("Creating collection {}", collectionName); + return client.createCollection( + String.format(DATABASE_LINK, databaseName), + collection, null); + } else { + // collection already exists, nothing else to be done. + LOG.info("Collection {} already exists.", collectionName); + return Observable.empty(); + } + }) + .doOnError(throwable -> LOG.error("Unable to create collection : {}", + collectionName, throwable)) + .toCompletable().await(); } @Override @@ -156,32 +192,40 @@ public class CosmosDBDocumentStoreWriter AccessCondition accessCondition = new AccessCondition(); StringBuilder eTagStrBuilder = new StringBuilder(); - TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType, + final TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType, timelineDoc, eTagStrBuilder); accessCondition.setCondition(eTagStrBuilder.toString()); accessCondition.setType(AccessConditionType.IfMatch); requestOptions.setAccessCondition(accessCondition); - try { - client.upsertDocument(collectionLink, updatedTimelineDoc, - requestOptions, true); + ResourceResponse resourceResponse = + client.upsertDocument(collectionLink, updatedTimelineDoc, + requestOptions, true) + .subscribeOn(schedulerForBlockingWork) + .doOnError(throwable -> + LOG.error("Error while upserting Collection : {} " + + "with Doc Id : {} under Database : {}", + collectionType.getCollectionName(), + updatedTimelineDoc.getId(), databaseName, throwable)) + .toBlocking() + .single(); + + if (resourceResponse.getStatusCode() == 409) { + LOG.warn("There was a conflict while upserting, hence retrying...", + resourceResponse); + upsertDocument(collectionType, updatedTimelineDoc); + } else if (resourceResponse.getStatusCode() >= 200 && resourceResponse + .getStatusCode() < 300) { LOG.debug("Successfully wrote doc with id : {} and type : {} under " + "Database : {}", timelineDoc.getId(), timelineDoc.getType(), databaseName); - } catch (DocumentClientException e) { - if (e.getStatusCode() == 409) { - LOG.warn("There was a conflict while upserting, hence retrying...", e); - upsertDocument(collectionType, updatedTimelineDoc); - } - LOG.error("Error while upserting Collection : {} with Doc Id : {} under" + - " Database : {}", collectionType.getCollectionName(), - updatedTimelineDoc.getId(), databaseName, e); } } + @VisibleForTesting @SuppressWarnings("unchecked") - private TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType, + TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType, TimelineDoc timelineDoc, StringBuilder eTagStrBuilder) { TimelineDoc prevDocument = fetchLatestDoc(collectionType, timelineDoc.getId(), eTagStrBuilder); @@ -192,14 +236,15 @@ public class CosmosDBDocumentStoreWriter return timelineDoc; } + @VisibleForTesting @SuppressWarnings("unchecked") - private TimelineDoc fetchLatestDoc(final CollectionType collectionType, + TimelineDoc fetchLatestDoc(final CollectionType collectionType, final String documentId, StringBuilder eTagStrBuilder) { final String documentLink = String.format(DOCUMENT_LINK, databaseName, collectionType.getCollectionName(), documentId); try { Document latestDocument = client.readDocument(documentLink, new - RequestOptions()).getResource(); + RequestOptions()).toBlocking().single().getResource(); TimelineDoc timelineDoc; switch (collectionType) { case FLOW_RUN: @@ -227,9 +272,17 @@ public class CosmosDBDocumentStoreWriter @Override public synchronized void close() { if (client != null) { - LOG.info("Closing Cosmos DB Client..."); + LOG.info("Closing Cosmos DB Writer Async Client..."); client.close(); client = null; } } + + private void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (executorService != null) { + executorService.shutdown(); + } + })); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java new file mode 100644 index 00000000000..416f1b84d99 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb; + +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Test case for {@link CosmosDBDocumentStoreReader}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(DocumentStoreUtils.class) +public class TestCosmosDBDocumentStoreReader { + + @Before + public void setUp(){ + AsyncDocumentClient asyncDocumentClient = + Mockito.mock(AsyncDocumentClient.class); + PowerMockito.mockStatic(DocumentStoreUtils.class); + PowerMockito.when(DocumentStoreUtils.getCosmosDBDatabaseName( + ArgumentMatchers.any(Configuration.class))) + .thenReturn("FooBar"); + PowerMockito.when(DocumentStoreUtils.createCosmosDBAsyncClient( + ArgumentMatchers.any(Configuration.class))) + .thenReturn(asyncDocumentClient); + } + + @Test(expected = IllegalArgumentException.class) + public void testFailureFOnEmptyPredicates() { + PowerMockito.when(DocumentStoreUtils.isNullOrEmpty( + ArgumentMatchers.any())) + .thenReturn(Boolean.TRUE); + + CosmosDBDocumentStoreReader cosmosDBDocumentStoreReader = + new CosmosDBDocumentStoreReader(null); + cosmosDBDocumentStoreReader.addPredicates( + new TimelineReaderContext(null, "", "", + null, "", "", null), + "DummyCollection", new StringBuilder()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java new file mode 100644 index 00000000000..ab0065eecae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument; + +import java.io.IOException; + +/** + * This is a mocked class for {@link CosmosDBDocumentStoreWriter}. + */ +public class MockedCosmosDBDocumentStoreWriter + extends CosmosDBDocumentStoreWriter { + + public MockedCosmosDBDocumentStoreWriter(Configuration conf) { + super(conf); + } + + @Override + TimelineDocument fetchLatestDoc(CollectionType collectionType, + String documentId, StringBuilder eTagStrBuilder) { + try { + return DocumentStoreTestUtils.bakeTimelineEntityDoc(); + } catch (IOException e) { + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java new file mode 100644 index 00000000000..783d04ecc39 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb; + +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType; +import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; + +/** + * Test case for {@link CosmosDBDocumentStoreWriter}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(DocumentStoreUtils.class) +public class TestCosmosDBDocumentStoreWriter { + + @Before + public void setUp() { + AsyncDocumentClient asyncDocumentClient = + Mockito.mock(AsyncDocumentClient.class); + PowerMockito.mockStatic(DocumentStoreUtils.class); + PowerMockito.when(DocumentStoreUtils.getCosmosDBDatabaseName( + ArgumentMatchers.any(Configuration.class))) + .thenReturn("FooBar"); + PowerMockito.when(DocumentStoreUtils.createCosmosDBAsyncClient( + ArgumentMatchers.any(Configuration.class))) + .thenReturn(asyncDocumentClient); + } + + @SuppressWarnings("unchecked") + @Test + public void applyingUpdatesOnPrevDocTest() throws IOException { + MockedCosmosDBDocumentStoreWriter documentStoreWriter = + new MockedCosmosDBDocumentStoreWriter(null); + + TimelineEntityDocument actualEntityDoc = + new TimelineEntityDocument(); + TimelineEntityDocument expectedEntityDoc = + DocumentStoreTestUtils.bakeTimelineEntityDoc(); + + Assert.assertEquals(1, actualEntityDoc.getInfo().size()); + Assert.assertEquals(0, actualEntityDoc.getMetrics().size()); + Assert.assertEquals(0, actualEntityDoc.getEvents().size()); + Assert.assertEquals(0, actualEntityDoc.getConfigs().size()); + Assert.assertEquals(0, + actualEntityDoc.getIsRelatedToEntities().size()); + Assert.assertEquals(0, actualEntityDoc. + getRelatesToEntities().size()); + + actualEntityDoc = (TimelineEntityDocument) documentStoreWriter + .applyUpdatesOnPrevDoc(CollectionType.ENTITY, + actualEntityDoc, null); + + Assert.assertEquals(expectedEntityDoc.getInfo().size(), + actualEntityDoc.getInfo().size()); + Assert.assertEquals(expectedEntityDoc.getMetrics().size(), + actualEntityDoc.getMetrics().size()); + Assert.assertEquals(expectedEntityDoc.getEvents().size(), + actualEntityDoc.getEvents().size()); + Assert.assertEquals(expectedEntityDoc.getConfigs().size(), + actualEntityDoc.getConfigs().size()); + Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(), + actualEntityDoc.getIsRelatedToEntities().size()); + Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(), + actualEntityDoc.getRelatesToEntities().size()); + } +} \ No newline at end of file