diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
index 4a6cf7ab15e..18ccb243541 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
@@ -28,7 +28,8 @@
${project.parent.parent.basedir}
- 1.16.2
+ 2.4.5
+ 1.3.8
@@ -44,11 +45,6 @@
test-jar
test
-
- com.microsoft.azure
- azure-documentdb
- ${azure.documentdb.version}
-
junit
junit
@@ -84,6 +80,38 @@
+
+ io.reactivex
+ rxjava
+ ${rxjava.version}
+
+
+ com.microsoft.azure
+ azure-cosmosdb
+ ${azure.documentdb.version}
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-codec-http
+
+
+ io.reactivex
+ rxjava
+
+
+ io.netty
+ netty-transport
+
+
+ io.netty
+ netty-handler-proxy
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
index 4b14d47c6a3..6833502f83f 100755
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
@@ -18,9 +18,9 @@
package org.apache.hadoop.yarn.server.timelineservice.documentstore;
-import com.microsoft.azure.documentdb.ConnectionPolicy;
-import com.microsoft.azure.documentdb.ConsistencyLevel;
-import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.cosmosdb.ConnectionPolicy;
+import com.microsoft.azure.cosmosdb.ConsistencyLevel;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
@@ -134,6 +134,10 @@ public final class DocumentStoreUtils {
* @return false if any of the string is null or empty else true
*/
public static boolean isNullOrEmpty(String...values) {
+ if (values == null || values.length == 0) {
+ return true;
+ }
+
for (String value : values) {
if (value == null || value.isEmpty()) {
return true;
@@ -143,15 +147,20 @@ public final class DocumentStoreUtils {
}
/**
- * Creates CosmosDB Document Client.
+ * Creates CosmosDB Async Document Client.
* @param conf
* to retrieve cosmos db endpoint and key
* @return async document client for CosmosDB
*/
- public static DocumentClient createCosmosDBClient(Configuration conf){
- return new DocumentClient(DocumentStoreUtils.getCosmosDBEndpoint(conf),
- DocumentStoreUtils.getCosmosDBMasterKey(conf),
- ConnectionPolicy.GetDefault(), ConsistencyLevel.Session);
+ public static AsyncDocumentClient createCosmosDBAsyncClient(
+ Configuration conf){
+ return new AsyncDocumentClient.Builder()
+ .withServiceEndpoint(DocumentStoreUtils.getCosmosDBEndpoint(conf))
+ .withMasterKeyOrResourceToken(
+ DocumentStoreUtils.getCosmosDBMasterKey(conf))
+ .withConnectionPolicy(ConnectionPolicy.GetDefault())
+ .withConsistencyLevel(ConsistencyLevel.Session)
+ .build();
}
/**
@@ -486,4 +495,4 @@ public final class DocumentStoreUtils {
return false;
}
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
index cce64ab5f1e..6a46d08764e 100755
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
@@ -30,7 +30,6 @@ import java.util.Map;
public class TimelineEventSubDoc {
private final TimelineEvent timelineEvent;
- private boolean valid;
public TimelineEventSubDoc() {
timelineEvent = new TimelineEvent();
@@ -51,11 +50,7 @@ public class TimelineEventSubDoc {
public boolean isValid() {
return timelineEvent.isValid();
}
-
- public void setValid(boolean valid) {
- this.valid = valid;
- }
-
+
public long getTimestamp() {
return timelineEvent.getTimestamp();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
index f7d078f6890..c0a569baa12 100755
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
@@ -31,7 +31,6 @@ import java.util.TreeMap;
public class TimelineMetricSubDoc {
private final TimelineMetric timelineMetric;
- private boolean valid;
private long singleDataTimestamp;
private Number singleDataValue = 0;
@@ -41,7 +40,6 @@ public class TimelineMetricSubDoc {
public TimelineMetricSubDoc(TimelineMetric timelineMetric) {
this.timelineMetric = timelineMetric;
- this.valid = timelineMetric.isValid();
if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE &&
timelineMetric.getValues().size() > 0) {
this.singleDataTimestamp = timelineMetric.getSingleDataTimestamp();
@@ -130,10 +128,6 @@ public class TimelineMetricSubDoc {
timelineMetric.setType(metricType);
}
- public void setValid(boolean valid) {
- this.valid = valid;
- }
-
public boolean isValid() {
return (timelineMetric.getId() != null);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
index 834ebd55aee..1df89dee8b8 100755
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
-import com.microsoft.azure.documentdb.Document;
-import com.microsoft.azure.documentdb.DocumentClient;
-import com.microsoft.azure.documentdb.FeedOptions;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.microsoft.azure.cosmosdb.FeedOptions;
+import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
@@ -30,12 +32,14 @@ import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentS
import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.schedulers.Schedulers;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
@@ -49,7 +53,7 @@ public class CosmosDBDocumentStoreReader
.getLogger(CosmosDBDocumentStoreReader.class);
private static final int DEFAULT_DOCUMENTS_SIZE = 1;
- private static volatile DocumentClient client;
+ private static AsyncDocumentClient client;
private final String databaseName;
private final static String COLLECTION_LINK = "/dbs/%s/colls/%s";
private final static String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * " +
@@ -66,17 +70,24 @@ public class CosmosDBDocumentStoreReader
"\"%s\") ";
private final static String ORDER_BY_CLAUSE = " ORDER BY c.createdTime";
+ // creating thread pool of size, half of the total available threads from JVM
+ private static ExecutorService executorService = Executors.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors() / 2);
+ private static Scheduler schedulerForBlockingWork =
+ Schedulers.from(executorService);
+
public CosmosDBDocumentStoreReader(Configuration conf) {
LOG.info("Initializing Cosmos DB DocumentStoreReader...");
databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
- // making CosmosDB Client Singleton
+ initCosmosDBClient(conf);
+ }
+
+ private synchronized void initCosmosDBClient(Configuration conf) {
+ // making CosmosDB Async Client Singleton
if (client == null) {
- synchronized (this) {
- if (client == null) {
- LOG.info("Creating Cosmos DB Client...");
- client = DocumentStoreUtils.createCosmosDBClient(conf);
- }
- }
+ LOG.info("Creating Cosmos DB Reader Async Client...");
+ client = DocumentStoreUtils.createCosmosDBAsyncClient(conf);
+ addShutdownHook();
}
}
@@ -104,15 +115,16 @@ public class CosmosDBDocumentStoreReader
LOG.debug("Querying Collection : {} , with query {}", collectionName,
sqlQuery);
- Set entityTypes = new HashSet<>();
- Iterator documentIterator = client.queryDocuments(
+ return Sets.newHashSet(client.queryDocuments(
String.format(COLLECTION_LINK, databaseName, collectionName),
- sqlQuery, null).getQueryIterator();
- while (documentIterator.hasNext()) {
- Document document = documentIterator.next();
- entityTypes.add(document.getString(ENTITY_TYPE_COLUMN));
- }
- return entityTypes;
+ sqlQuery, new FeedOptions())
+ .map(FeedResponse::getResults) // Map the page to the list of documents
+ .concatMap(Observable::from)
+ .map(document -> String.valueOf(document.get(ENTITY_TYPE_COLUMN)))
+ .toList()
+ .subscribeOn(schedulerForBlockingWork)
+ .toBlocking()
+ .single());
}
@Override
@@ -133,25 +145,25 @@ public class CosmosDBDocumentStoreReader
final long maxDocumentsSize) {
final String sqlQuery = buildQueryWithPredicates(context, collectionName,
maxDocumentsSize);
- List timelineDocs = new ArrayList<>();
LOG.debug("Querying Collection : {} , with query {}", collectionName,
sqlQuery);
- FeedOptions feedOptions = new FeedOptions();
- feedOptions.setPageSize((int) maxDocumentsSize);
- Iterator documentIterator = client.queryDocuments(
- String.format(COLLECTION_LINK, databaseName, collectionName),
- sqlQuery, feedOptions).getQueryIterator();
- while (documentIterator.hasNext()) {
- Document document = documentIterator.next();
- TimelineDoc resultDoc = document.toObject(docClass);
- if (resultDoc.getCreatedTime() == 0 &&
- document.getTimestamp() != null) {
- resultDoc.setCreatedTime(document.getTimestamp().getTime());
- }
- timelineDocs.add(resultDoc);
- }
- return timelineDocs;
+ return client.queryDocuments(String.format(COLLECTION_LINK,
+ databaseName, collectionName), sqlQuery, new FeedOptions())
+ .map(FeedResponse::getResults) // Map the page to the list of documents
+ .concatMap(Observable::from)
+ .map(document -> {
+ TimelineDoc resultDoc = document.toObject(docClass);
+ if (resultDoc.getCreatedTime() == 0 &&
+ document.getTimestamp() != null) {
+ resultDoc.setCreatedTime(document.getTimestamp().getTime());
+ }
+ return resultDoc;
+ })
+ .toList()
+ .subscribeOn(schedulerForBlockingWork)
+ .toBlocking()
+ .single();
}
private String buildQueryWithPredicates(TimelineReaderContext context,
@@ -168,33 +180,34 @@ public class CosmosDBDocumentStoreReader
return addPredicates(context, collectionName, queryStrBuilder);
}
- private String addPredicates(TimelineReaderContext context,
+ @VisibleForTesting
+ String addPredicates(TimelineReaderContext context,
String collectionName, StringBuilder queryStrBuilder) {
boolean hasPredicate = false;
queryStrBuilder.append(WHERE_CLAUSE);
- if (context.getClusterId() != null) {
+ if (!DocumentStoreUtils.isNullOrEmpty(context.getClusterId())) {
hasPredicate = true;
queryStrBuilder.append(String.format(CONTAINS_FUNC_FOR_ID,
context.getClusterId()));
}
- if (context.getUserId() != null) {
+ if (!DocumentStoreUtils.isNullOrEmpty(context.getUserId())) {
hasPredicate = true;
queryStrBuilder.append(AND_OPERATOR)
.append(String.format(CONTAINS_FUNC_FOR_ID, context.getUserId()));
}
- if (context.getFlowName() != null) {
+ if (!DocumentStoreUtils.isNullOrEmpty(context.getFlowName())) {
hasPredicate = true;
queryStrBuilder.append(AND_OPERATOR)
.append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowName()));
}
- if (context.getAppId() != null) {
+ if (!DocumentStoreUtils.isNullOrEmpty(context.getAppId())) {
hasPredicate = true;
queryStrBuilder.append(AND_OPERATOR)
.append(String.format(CONTAINS_FUNC_FOR_ID, context.getAppId()));
}
- if (context.getEntityId() != null) {
+ if (!DocumentStoreUtils.isNullOrEmpty(context.getEntityId())) {
hasPredicate = true;
queryStrBuilder.append(AND_OPERATOR)
.append(String.format(CONTAINS_FUNC_FOR_ID, context.getEntityId()));
@@ -204,7 +217,7 @@ public class CosmosDBDocumentStoreReader
queryStrBuilder.append(AND_OPERATOR)
.append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowRunId()));
}
- if (context.getEntityType() != null){
+ if (!DocumentStoreUtils.isNullOrEmpty(context.getEntityType())){
hasPredicate = true;
queryStrBuilder.append(AND_OPERATOR)
.append(String.format(CONTAINS_FUNC_FOR_TYPE,
@@ -224,9 +237,17 @@ public class CosmosDBDocumentStoreReader
@Override
public synchronized void close() {
if (client != null) {
- LOG.info("Closing Cosmos DB Client...");
+ LOG.info("Closing Cosmos DB Reader Async Client...");
client.close();
client = null;
}
}
-}
\ No newline at end of file
+
+ private void addShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
index 16ed207be9e..32965840357 100755
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
@@ -19,14 +19,20 @@
package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
-import com.microsoft.azure.documentdb.AccessCondition;
-import com.microsoft.azure.documentdb.AccessConditionType;
-import com.microsoft.azure.documentdb.Database;
-import com.microsoft.azure.documentdb.Document;
-import com.microsoft.azure.documentdb.DocumentClient;
-import com.microsoft.azure.documentdb.DocumentClientException;
-import com.microsoft.azure.documentdb.DocumentCollection;
-import com.microsoft.azure.documentdb.RequestOptions;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.azure.cosmosdb.AccessCondition;
+import com.microsoft.azure.cosmosdb.AccessConditionType;
+import com.microsoft.azure.cosmosdb.Database;
+import com.microsoft.azure.cosmosdb.Document;
+import com.microsoft.azure.cosmosdb.DocumentClientException;
+import com.microsoft.azure.cosmosdb.DocumentCollection;
+import com.microsoft.azure.cosmosdb.FeedResponse;
+import com.microsoft.azure.cosmosdb.RequestOptions;
+import com.microsoft.azure.cosmosdb.ResourceResponse;
+import com.microsoft.azure.cosmosdb.SqlParameter;
+import com.microsoft.azure.cosmosdb.SqlParameterCollection;
+import com.microsoft.azure.cosmosdb.SqlQuerySpec;
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
@@ -40,6 +46,13 @@ import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentS
import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* This is the Document Store Writer implementation for
@@ -51,79 +64,102 @@ public class CosmosDBDocumentStoreWriter
private static final Logger LOG = LoggerFactory
.getLogger(CosmosDBDocumentStoreWriter.class);
- private static volatile DocumentClient client;
private final String databaseName;
private static final PerNodeAggTimelineCollectorMetrics METRICS =
PerNodeAggTimelineCollectorMetrics.getInstance();
+
+ private static AsyncDocumentClient client;
+ // creating thread pool of size equal to number of collection types
+ private ExecutorService executorService =
+ Executors.newFixedThreadPool(CollectionType.values().length);
+ private Scheduler schedulerForBlockingWork =
+ Schedulers.from(executorService);
+
private static final String DATABASE_LINK = "/dbs/%s";
private static final String COLLECTION_LINK = DATABASE_LINK + "/colls/%s";
private static final String DOCUMENT_LINK = COLLECTION_LINK + "/docs/%s";
+ private static final String ID = "@id";
+ private static final String QUERY_COLLECTION_IF_EXISTS = "SELECT * FROM r " +
+ "where r.id = " + ID;
public CosmosDBDocumentStoreWriter(Configuration conf) {
LOG.info("Initializing Cosmos DB DocumentStoreWriter...");
databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
- // making CosmosDB Client Singleton
+ initCosmosDBClient(conf);
+ }
+
+ private synchronized void initCosmosDBClient(Configuration conf) {
+ // making CosmosDB Async Client Singleton
if (client == null) {
- synchronized (this) {
- if (client == null) {
- LOG.info("Creating Cosmos DB Client...");
- client = DocumentStoreUtils.createCosmosDBClient(conf);
- }
- }
+ LOG.info("Creating Cosmos DB Writer Async Client...");
+ client = DocumentStoreUtils.createCosmosDBAsyncClient(conf);
+ addShutdownHook();
}
}
@Override
public void createDatabase() {
- try {
- client.readDatabase(String.format(
- DATABASE_LINK, databaseName), new RequestOptions());
- LOG.info("Database {} already exists.", databaseName);
- } catch (DocumentClientException docExceptionOnRead) {
- if (docExceptionOnRead.getStatusCode() == 404) {
- LOG.info("Creating new Database : {}", databaseName);
- Database databaseDefinition = new Database();
- databaseDefinition.setId(databaseName);
- try {
- client.createDatabase(databaseDefinition, new RequestOptions());
- } catch (DocumentClientException docExceptionOnCreate) {
- LOG.error("Unable to create new Database : {}", databaseName,
- docExceptionOnCreate);
- }
- } else {
- LOG.error("Error while reading Database : {}", databaseName,
- docExceptionOnRead);
- }
- }
+ Observable> databaseReadObs =
+ client.readDatabase(String.format(DATABASE_LINK, databaseName), null);
+
+ Observable> databaseExistenceObs =
+ databaseReadObs
+ .doOnNext(databaseResourceResponse ->
+ LOG.info("Database {} already exists.", databaseName))
+ .onErrorResumeNext(throwable -> {
+ // if the database doesn't exists
+ // readDatabase() will result in 404 error
+ if (throwable instanceof DocumentClientException) {
+ DocumentClientException de =
+ (DocumentClientException) throwable;
+ if (de.getStatusCode() == 404) {
+ // if the database doesn't exist, create it.
+ LOG.info("Creating new Database : {}", databaseName);
+
+ Database dbDefinition = new Database();
+ dbDefinition.setId(databaseName);
+
+ return client.createDatabase(dbDefinition, null);
+ }
+ }
+ // some unexpected failure in reading database happened.
+ // pass the error up.
+ LOG.error("Reading database : {} if it exists failed.",
+ databaseName, throwable);
+ return Observable.error(throwable);
+ });
+ // wait for completion
+ databaseExistenceObs.toCompletable().await();
}
@Override
public void createCollection(final String collectionName) {
LOG.info("Creating Timeline Collection : {} for Database : {}",
collectionName, databaseName);
- try {
- client.readCollection(String.format(COLLECTION_LINK, databaseName,
- collectionName), new RequestOptions());
- LOG.info("Collection {} already exists.", collectionName);
- } catch (DocumentClientException docExceptionOnRead) {
- if (docExceptionOnRead.getStatusCode() == 404) {
- DocumentCollection collection = new DocumentCollection();
- collection.setId(collectionName);
- LOG.info("Creating collection {} under Database {}",
- collectionName, databaseName);
- try {
- client.createCollection(
- String.format(DATABASE_LINK, databaseName),
- collection, new RequestOptions());
- } catch (DocumentClientException docExceptionOnCreate) {
- LOG.error("Unable to create Collection : {} under Database : {}",
- collectionName, databaseName, docExceptionOnCreate);
- }
- } else {
- LOG.error("Error while reading Collection : {} under Database : {}",
- collectionName, databaseName, docExceptionOnRead);
- }
- }
+ client.queryCollections(String.format(DATABASE_LINK, databaseName),
+ new SqlQuerySpec(QUERY_COLLECTION_IF_EXISTS,
+ new SqlParameterCollection(
+ new SqlParameter(ID, collectionName))), null)
+ .single() // there should be single page of result
+ .flatMap((Func1, Observable>>)
+ page -> {
+ if (page.getResults().isEmpty()) {
+ // if there is no matching collection create one.
+ DocumentCollection collection = new DocumentCollection();
+ collection.setId(collectionName);
+ LOG.info("Creating collection {}", collectionName);
+ return client.createCollection(
+ String.format(DATABASE_LINK, databaseName),
+ collection, null);
+ } else {
+ // collection already exists, nothing else to be done.
+ LOG.info("Collection {} already exists.", collectionName);
+ return Observable.empty();
+ }
+ })
+ .doOnError(throwable -> LOG.error("Unable to create collection : {}",
+ collectionName, throwable))
+ .toCompletable().await();
}
@Override
@@ -156,32 +192,40 @@ public class CosmosDBDocumentStoreWriter
AccessCondition accessCondition = new AccessCondition();
StringBuilder eTagStrBuilder = new StringBuilder();
- TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType,
+ final TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType,
timelineDoc, eTagStrBuilder);
accessCondition.setCondition(eTagStrBuilder.toString());
accessCondition.setType(AccessConditionType.IfMatch);
requestOptions.setAccessCondition(accessCondition);
- try {
- client.upsertDocument(collectionLink, updatedTimelineDoc,
- requestOptions, true);
+ ResourceResponse resourceResponse =
+ client.upsertDocument(collectionLink, updatedTimelineDoc,
+ requestOptions, true)
+ .subscribeOn(schedulerForBlockingWork)
+ .doOnError(throwable ->
+ LOG.error("Error while upserting Collection : {} " +
+ "with Doc Id : {} under Database : {}",
+ collectionType.getCollectionName(),
+ updatedTimelineDoc.getId(), databaseName, throwable))
+ .toBlocking()
+ .single();
+
+ if (resourceResponse.getStatusCode() == 409) {
+ LOG.warn("There was a conflict while upserting, hence retrying...",
+ resourceResponse);
+ upsertDocument(collectionType, updatedTimelineDoc);
+ } else if (resourceResponse.getStatusCode() >= 200 && resourceResponse
+ .getStatusCode() < 300) {
LOG.debug("Successfully wrote doc with id : {} and type : {} under " +
"Database : {}", timelineDoc.getId(), timelineDoc.getType(),
databaseName);
- } catch (DocumentClientException e) {
- if (e.getStatusCode() == 409) {
- LOG.warn("There was a conflict while upserting, hence retrying...", e);
- upsertDocument(collectionType, updatedTimelineDoc);
- }
- LOG.error("Error while upserting Collection : {} with Doc Id : {} under" +
- " Database : {}", collectionType.getCollectionName(),
- updatedTimelineDoc.getId(), databaseName, e);
}
}
+ @VisibleForTesting
@SuppressWarnings("unchecked")
- private TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType,
+ TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType,
TimelineDoc timelineDoc, StringBuilder eTagStrBuilder) {
TimelineDoc prevDocument = fetchLatestDoc(collectionType,
timelineDoc.getId(), eTagStrBuilder);
@@ -192,14 +236,15 @@ public class CosmosDBDocumentStoreWriter
return timelineDoc;
}
+ @VisibleForTesting
@SuppressWarnings("unchecked")
- private TimelineDoc fetchLatestDoc(final CollectionType collectionType,
+ TimelineDoc fetchLatestDoc(final CollectionType collectionType,
final String documentId, StringBuilder eTagStrBuilder) {
final String documentLink = String.format(DOCUMENT_LINK, databaseName,
collectionType.getCollectionName(), documentId);
try {
Document latestDocument = client.readDocument(documentLink, new
- RequestOptions()).getResource();
+ RequestOptions()).toBlocking().single().getResource();
TimelineDoc timelineDoc;
switch (collectionType) {
case FLOW_RUN:
@@ -227,9 +272,17 @@ public class CosmosDBDocumentStoreWriter
@Override
public synchronized void close() {
if (client != null) {
- LOG.info("Closing Cosmos DB Client...");
+ LOG.info("Closing Cosmos DB Writer Async Client...");
client.close();
client = null;
}
}
+
+ private void addShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }));
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java
new file mode 100644
index 00000000000..416f1b84d99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/TestCosmosDBDocumentStoreReader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
+
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test case for {@link CosmosDBDocumentStoreReader}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreUtils.class)
+public class TestCosmosDBDocumentStoreReader {
+
+ @Before
+ public void setUp(){
+ AsyncDocumentClient asyncDocumentClient =
+ Mockito.mock(AsyncDocumentClient.class);
+ PowerMockito.mockStatic(DocumentStoreUtils.class);
+ PowerMockito.when(DocumentStoreUtils.getCosmosDBDatabaseName(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn("FooBar");
+ PowerMockito.when(DocumentStoreUtils.createCosmosDBAsyncClient(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn(asyncDocumentClient);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailureFOnEmptyPredicates() {
+ PowerMockito.when(DocumentStoreUtils.isNullOrEmpty(
+ ArgumentMatchers.any()))
+ .thenReturn(Boolean.TRUE);
+
+ CosmosDBDocumentStoreReader cosmosDBDocumentStoreReader =
+ new CosmosDBDocumentStoreReader(null);
+ cosmosDBDocumentStoreReader.addPredicates(
+ new TimelineReaderContext(null, "", "",
+ null, "", "", null),
+ "DummyCollection", new StringBuilder());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java
new file mode 100644
index 00000000000..ab0065eecae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/MockedCosmosDBDocumentStoreWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+
+import java.io.IOException;
+
+/**
+ * This is a mocked class for {@link CosmosDBDocumentStoreWriter}.
+ */
+public class MockedCosmosDBDocumentStoreWriter
+ extends CosmosDBDocumentStoreWriter {
+
+ public MockedCosmosDBDocumentStoreWriter(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ TimelineDocument fetchLatestDoc(CollectionType collectionType,
+ String documentId, StringBuilder eTagStrBuilder) {
+ try {
+ return DocumentStoreTestUtils.bakeTimelineEntityDoc();
+ } catch (IOException e) {
+ return null;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java
new file mode 100644
index 00000000000..783d04ecc39
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/TestCosmosDBDocumentStoreWriter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+
+/**
+ * Test case for {@link CosmosDBDocumentStoreWriter}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreUtils.class)
+public class TestCosmosDBDocumentStoreWriter {
+
+ @Before
+ public void setUp() {
+ AsyncDocumentClient asyncDocumentClient =
+ Mockito.mock(AsyncDocumentClient.class);
+ PowerMockito.mockStatic(DocumentStoreUtils.class);
+ PowerMockito.when(DocumentStoreUtils.getCosmosDBDatabaseName(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn("FooBar");
+ PowerMockito.when(DocumentStoreUtils.createCosmosDBAsyncClient(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn(asyncDocumentClient);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void applyingUpdatesOnPrevDocTest() throws IOException {
+ MockedCosmosDBDocumentStoreWriter documentStoreWriter =
+ new MockedCosmosDBDocumentStoreWriter(null);
+
+ TimelineEntityDocument actualEntityDoc =
+ new TimelineEntityDocument();
+ TimelineEntityDocument expectedEntityDoc =
+ DocumentStoreTestUtils.bakeTimelineEntityDoc();
+
+ Assert.assertEquals(1, actualEntityDoc.getInfo().size());
+ Assert.assertEquals(0, actualEntityDoc.getMetrics().size());
+ Assert.assertEquals(0, actualEntityDoc.getEvents().size());
+ Assert.assertEquals(0, actualEntityDoc.getConfigs().size());
+ Assert.assertEquals(0,
+ actualEntityDoc.getIsRelatedToEntities().size());
+ Assert.assertEquals(0, actualEntityDoc.
+ getRelatesToEntities().size());
+
+ actualEntityDoc = (TimelineEntityDocument) documentStoreWriter
+ .applyUpdatesOnPrevDoc(CollectionType.ENTITY,
+ actualEntityDoc, null);
+
+ Assert.assertEquals(expectedEntityDoc.getInfo().size(),
+ actualEntityDoc.getInfo().size());
+ Assert.assertEquals(expectedEntityDoc.getMetrics().size(),
+ actualEntityDoc.getMetrics().size());
+ Assert.assertEquals(expectedEntityDoc.getEvents().size(),
+ actualEntityDoc.getEvents().size());
+ Assert.assertEquals(expectedEntityDoc.getConfigs().size(),
+ actualEntityDoc.getConfigs().size());
+ Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+ actualEntityDoc.getIsRelatedToEntities().size());
+ Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+ actualEntityDoc.getRelatesToEntities().size());
+ }
+}
\ No newline at end of file