Issue #56 - threaded sampling with auto-terminate upon reaching end of records

This commit is contained in:
Joshua Darnell 2021-04-29 03:18:40 -07:00
parent b6ed5308ea
commit d6d0799fb5
2 changed files with 86 additions and 45 deletions

View File

@ -23,12 +23,12 @@ Feature: IDX Payload Endorsement (Web API)
Scenario: Standard Resource Sampling - Request Data from Each Server Resource
Given that metadata have been requested from the server
And the metadata contains RESO Standard Resources
When 1000 records are sampled from each RESO Standard resource in the server metadata
When 10000 records are sampled from each RESO Standard resource in the server metadata
Then each record MUST have the string version of the Primary Key and ModificationTimestamp field
And the data MUST match what is advertised in the metadata
@non-standard-resource-sampling @idx-payload-endorsement
Scenario: Non Standard Resource Sampling - Request Data from Each Server Resource
Given that metadata have been requested from the server
When 1000 records are sampled from each non standard resource in the server metadata
When 10000 records are sampled from each non standard resource in the server metadata
Then the data MUST match what is advertised in the metadata

View File

@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.olingo.client.api.data.ResWrap;
import org.apache.olingo.commons.api.data.EntityCollection;
import org.apache.olingo.commons.api.edm.EdmKeyPropertyRef;
import org.apache.olingo.commons.api.edm.EdmPrimitiveTypeKind;
import org.apache.olingo.commons.api.format.ContentType;
import org.reso.certification.containers.WebAPITestContainer;
import org.reso.commander.common.DataDictionaryMetadata;
@ -120,7 +121,7 @@ public class IDXPayload {
*/
AtomicReference<PayloadSample> result = new AtomicReference<>();
standardResources.get().parallelStream().forEach(resourceName -> {
standardResources.get().stream().parallel().forEach(resourceName -> {
result.set(fetchAndProcessRecords(resourceName, numRecords));
});
}
@ -132,16 +133,28 @@ public class IDXPayload {
ODataTransportWrapper transportWrapper;
final String TIMESTAMP_STANDARD_FIELD = "ModificationTimestamp";
boolean hasStandardTimestampField = false;
if (container.get().getEdm().getEntityContainer().getEntitySet(resourceName).getEntityType().getProperty("ModificationTimestamp") == null)
return null;
List<String> timestampCandidateFields = new LinkedList<>();
if (container.get().getEdm().getEntityContainer().getEntitySet(resourceName).getEntityType().getProperty(TIMESTAMP_STANDARD_FIELD) == null) {
LOG.info("Could not find " + MODIFICATION_TIMESTAMP_FIELD + " in the " + resourceName + " resource!");
LOG.info("Searching for suitable timestamp fields...");
container.get().getEdm().getEntityContainer().getEntitySet(resourceName).getEntityType().getPropertyNames().forEach(propertyName -> {
if (container.get().getEdm().getEntityContainer().getEntitySet(resourceName).getEntityType().getProperty(propertyName).getType().getFullQualifiedName().getFullQualifiedNameAsString().contentEquals(EdmPrimitiveTypeKind.DateTimeOffset.getFullQualifiedName().getFullQualifiedNameAsString())) {
LOG.info("Found DateTimeOffset field " + propertyName + " in the " + resourceName + " resource!");
timestampCandidateFields.add(propertyName);
}
});
} else {
hasStandardTimestampField = true;
}
//consider parameterizing the timestamp field and let applicant pass them in the config
//all RESO Resources MUST have ModificationTimestamp or previous test steps will fail
final String REQUEST_URI_TEMPLATE = "?$filter=" + TIMESTAMP_STANDARD_FIELD
+ " lt '%s'&$orderby=" + TIMESTAMP_STANDARD_FIELD + " desc";
final String REQUEST_URI_TEMPLATE = "?$filter=%s"
+ " lt %s&$orderby=%s desc";
int recordsFetched;
String requestUri;
ResWrap<EntityCollection> entityCollectionResWrap = null;
//Map of String Resource Names to Key-Mapped Hash of Field and SHA value
@ -155,11 +168,25 @@ public class IDXPayload {
int recordsProcessed = 0;
int numRetries = 0;
int lastTimestampCandidateIndex = 0;
String timestampField;
AtomicBoolean noMoreRecords = new AtomicBoolean(false);
do {
if (hasStandardTimestampField) {
timestampField = TIMESTAMP_STANDARD_FIELD;
} else if (timestampCandidateFields.size() > 0 && lastTimestampCandidateIndex < timestampCandidateFields.size()) {
timestampField = timestampCandidateFields.get(lastTimestampCandidateIndex++);
} else {
scenario.log("ERROR: Could not find a suitable timestamp field in the " + resourceName + " resource to sample with...");
return null;
}
requestUri = container.get().getCommander().getClient()
.newURIBuilder(container.get().getServiceRoot())
.appendEntitySetSegment(resourceName).build().toString()
+ String.format(REQUEST_URI_TEMPLATE, lastFetchedDate.get().format(DateTimeFormatter.ISO_INSTANT));
+ String.format(REQUEST_URI_TEMPLATE, timestampField, lastFetchedDate.get().format(DateTimeFormatter.ISO_INSTANT), timestampField);
LOG.info("Making request to: " + requestUri);
transportWrapper = container.get().getCommander().executeODataGetRequest(requestUri);
@ -171,11 +198,21 @@ public class IDXPayload {
}
if (numRetries >= 3) {
failAndExitWithErrorMessage("Could not fetch records from the " + resourceName + " resource after 3 tries from the given URL: " + requestUri, scenario);
if (timestampCandidateFields.size() > 0 && (lastTimestampCandidateIndex < timestampCandidateFields.size())) {
scenario.log("Trying next candidate timestamp field: " + timestampCandidateFields.get(lastTimestampCandidateIndex));
numRetries = 0;
} else {
scenario.log("Could not fetch records from the " + resourceName + " resource after 3 tries from the given URL: " + requestUri);
break;
}
}
if (transportWrapper.getHttpResponseCode() != null && transportWrapper.getHttpResponseCode() != 200) {
getDefaultErrorMessage("Request to", requestUri, "failed with response code", transportWrapper.getHttpResponseCode().toString());
if (transportWrapper.getHttpResponseCode() == null || transportWrapper.getHttpResponseCode() != 200) {
if (transportWrapper.getHttpResponseCode() == null) {
getDefaultErrorMessage("Request to", requestUri, "failed! No response code was provided. Check commander.log for any errors...");
} else {
getDefaultErrorMessage("Request to", requestUri, "failed with response code", transportWrapper.getHttpResponseCode().toString());
}
break;
} else {
//need to look at the records from the response and get the lowest timestamp
@ -187,49 +224,53 @@ public class IDXPayload {
LOG.info("Time taken: "
+ (transportWrapper.getElapsedTimeMillis() >= 1000 ? (transportWrapper.getElapsedTimeMillis() / 1000) + "s" : transportWrapper.getElapsedTimeMillis() + "ms"));
assert (keyFields.size() > 0) :
getDefaultErrorMessage("no Key Fields found! Resources MUST have at least one key.");
if (entityCollectionResWrap.getPayload().getEntities().size() > 0) {
assert (keyFields.size() > 0) :
getDefaultErrorMessage("no Key Fields found! Resources MUST have at least one key.");
//we will always key the hash by the first key, the other key fields will still be there
//for the MUST requirement checks
final String keyField = keyFields.get(0).getName();
LOG.info("Hashing " + resourceName + " payload values...");
entityCollectionResWrap.getPayload().getEntities().forEach(entity -> {
entity.getProperties().forEach(property -> {
encodedSamples.get().computeIfAbsent(resourceName, k -> new LinkedHashMap<>());
//we will always key the hash by the first key, the other key fields will still be there
//for the MUST requirement checks
final String keyField = keyFields.get(0).getName();
LOG.info("Hashing " + resourceName + " payload values...");
entityCollectionResWrap.getPayload().getEntities().forEach(entity -> {
entity.getProperties().forEach(property -> {
encodedSamples.get().computeIfAbsent(resourceName, k -> new LinkedHashMap<>());
if (!encodedSamples.get().get(resourceName).containsKey(keyField)) {
encodedSamples.get().get(resourceName).put(entity.getProperty(keyField).getValue().toString(), new LinkedList<>());
}
final Vector<String> fieldMeta = new Vector<>();
fieldMeta.setSize(2);
fieldMeta.set(0, property.getName());
fieldMeta.set(1, property.getName().contentEquals(MODIFICATION_TIMESTAMP_FIELD) || keyFields.stream().reduce(false,
(acc, f) -> acc && f.getName().contentEquals(property.getName()), Boolean::logicalAnd)
? property.getValue().toString() : (property.getValue() != null
? hashValues(property.getValue().toString()) : null));
encodedSamples.get().get(resourceName).get(entity.getProperty(keyField).getValue().toString()).add(fieldMeta);
if (property.getName().contentEquals(MODIFICATION_TIMESTAMP_FIELD)) {
if (ZonedDateTime.parse(property.getValue().toString()).isBefore(lastFetchedDate.get())) {
lastFetchedDate.set(ZonedDateTime.parse(property.getValue().toString()));
if (!encodedSamples.get().get(resourceName).containsKey(keyField)) {
encodedSamples.get().get(resourceName).put(entity.getProperty(keyField).getValue().toString(), new LinkedList<>());
}
}
final Vector<String> fieldMeta = new Vector<>();
fieldMeta.setSize(2);
fieldMeta.set(0, property.getName());
fieldMeta.set(1, property.getName().contentEquals(MODIFICATION_TIMESTAMP_FIELD) || keyFields.stream().reduce(false,
(acc, f) -> acc && f.getName().contentEquals(property.getName()), Boolean::logicalAnd)
? property.getValue().toString() : (property.getValue() != null
? hashValues(property.getValue().toString()) : null));
encodedSamples.get().get(resourceName).get(entity.getProperty(keyField).getValue().toString()).add(fieldMeta);
if (property.getName().contentEquals(MODIFICATION_TIMESTAMP_FIELD)) {
if (ZonedDateTime.parse(property.getValue().toString()).isBefore(lastFetchedDate.get())) {
lastFetchedDate.set(ZonedDateTime.parse(property.getValue().toString()));
}
}
});
});
});
LOG.info("Values encoded!");
LOG.info("Values encoded!");
recordsProcessed += entityCollectionResWrap.getPayload().getEntities().size();
recordsProcessed += entityCollectionResWrap.getPayload().getEntities().size();
LOG.info("Records processed: " + recordsProcessed + " of " + targetRecordCount + "\n");
LOG.info("Records processed: " + recordsProcessed + ". Target record count: " + targetRecordCount + "\n");
} else {
LOG.info("All available records fetched! Total: " + recordsProcessed);
noMoreRecords.set(true);
}
} catch (Exception ex) {
getDefaultErrorMessage(ex.toString());
}
}
} while (recordsProcessed < targetRecordCount);
} while (!noMoreRecords.get() && recordsProcessed < targetRecordCount);
return null;
}