Issue #56 - Multi-threaded fetching and use of timezone offsets
This commit is contained in:
parent
554af35f17
commit
b6ed5308ea
|
@ -81,6 +81,8 @@ public final class WebAPITestContainer implements TestContainer {
|
|||
private final AtomicBoolean isUsingMetadataFile = new AtomicBoolean(false);
|
||||
|
||||
// request instance variables - these get resetMarkupBuffer with every request
|
||||
//TODO: refactor underlying response properties to use a ODataTransportWrapper (or any TransportWrapper)
|
||||
// and create the test container with the appropriate response of the transport wrapper
|
||||
private final AtomicReference<String> selectList = new AtomicReference<>();
|
||||
private final AtomicReference<ODataRawResponse> oDataRawResponse = new AtomicReference<>();
|
||||
private final AtomicReference<Request> request = new AtomicReference<>();
|
||||
|
|
|
@ -15,21 +15,22 @@ import org.apache.olingo.commons.api.data.EntityCollection;
|
|||
import org.apache.olingo.commons.api.edm.EdmKeyPropertyRef;
|
||||
import org.apache.olingo.commons.api.format.ContentType;
|
||||
import org.reso.certification.containers.WebAPITestContainer;
|
||||
import org.reso.commander.Commander;
|
||||
import org.reso.commander.common.DataDictionaryMetadata;
|
||||
import org.reso.models.ODataTransportWrapper;
|
||||
import org.reso.models.PayloadSample;
|
||||
import org.reso.models.Settings;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.google.common.hash.Hashing.sha256;
|
||||
import static junit.framework.TestCase.assertNotNull;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.reso.commander.common.ErrorMsg.getDefaultErrorMessage;
|
||||
import static org.reso.commander.common.TestUtils.failAndExitWithErrorMessage;
|
||||
|
@ -46,6 +47,8 @@ public class IDXPayload {
|
|||
|
||||
private final static AtomicReference<WebAPITestContainer> container = new AtomicReference<>();
|
||||
|
||||
private final static AtomicReference<Map<String, List<String>>> requests = new AtomicReference<>();
|
||||
|
||||
@Before
|
||||
public void beforeStep(Scenario scenario) {
|
||||
final String pathToRESOScript = System.getProperty("pathToRESOScript", null);
|
||||
|
@ -106,74 +109,128 @@ public class IDXPayload {
|
|||
scenario.log("No RESO Standard Resources to sample");
|
||||
assumeTrue(true);
|
||||
} else {
|
||||
final AtomicReference<ResWrap<EntityCollection>> entitySet = new AtomicReference<>();
|
||||
/*
|
||||
For each resource on the server
|
||||
1) read parameters from config
|
||||
2) generate k requests in order to sample n records
|
||||
3) run requests
|
||||
4) schema validation against metadata (when present)
|
||||
5) encode results (possibly serialize)
|
||||
6) generate reports
|
||||
*/
|
||||
|
||||
//Map of String Resource Names to Key-Mapped Hash of Field and SHA value
|
||||
final AtomicReference<Map<String, Map<String, List<Vector<String>>>>> encodedSamples = new AtomicReference<>(new LinkedHashMap<>());
|
||||
AtomicReference<PayloadSample> result = new AtomicReference<>();
|
||||
standardResources.get().parallelStream().forEach(resourceName -> {
|
||||
result.set(fetchAndProcessRecords(resourceName, numRecords));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
standardResources.get().forEach(resourceName -> {
|
||||
//create placeholder hashmap for this resource name
|
||||
encodedSamples.get().put(resourceName, new LinkedHashMap<>());
|
||||
PayloadSample fetchAndProcessRecords(String resourceName, int targetRecordCount) {
|
||||
final AtomicReference<ZonedDateTime> lastFetchedDate = new AtomicReference<>(ZonedDateTime.now()); //.minus(5, ChronoUnit.YEARS)
|
||||
final ZonedDateTime startDate = lastFetchedDate.get();
|
||||
|
||||
ODataTransportWrapper transportWrapper;
|
||||
final String TIMESTAMP_STANDARD_FIELD = "ModificationTimestamp";
|
||||
|
||||
if (container.get().getEdm().getEntityContainer().getEntitySet(resourceName).getEntityType().getProperty("ModificationTimestamp") == null)
|
||||
return null;
|
||||
|
||||
//consider parameterizing the timestamp field and let applicant pass them in the config
|
||||
//all RESO Resources MUST have ModificationTimestamp or previous test steps will fail
|
||||
final String REQUEST_URI_TEMPLATE = "?$filter=" + TIMESTAMP_STANDARD_FIELD
|
||||
+ " lt '%s'&$orderby=" + TIMESTAMP_STANDARD_FIELD + " desc";
|
||||
|
||||
int recordsFetched;
|
||||
String requestUri;
|
||||
ResWrap<EntityCollection> entityCollectionResWrap = null;
|
||||
//Map of String Resource Names to Key-Mapped Hash of Field and SHA value
|
||||
final AtomicReference<Map<String, Map<String, List<Vector<String>>>>> encodedSamples = new AtomicReference<>(new LinkedHashMap<>());
|
||||
|
||||
//assumes the 0th key is always used. TODO: determine if we need to scan the keys.
|
||||
final List<EdmKeyPropertyRef> keyFields = container.get().getEdm().getEntityContainer()
|
||||
.getEntitySet(resourceName).getEntityType().getKeyPropertyRefs();
|
||||
|
||||
LOG.info("Keys found: " + keyFields.stream().map(EdmKeyPropertyRef::getName).collect(Collectors.joining(", ")));
|
||||
|
||||
int recordsProcessed = 0;
|
||||
int numRetries = 0;
|
||||
do {
|
||||
requestUri = container.get().getCommander().getClient()
|
||||
.newURIBuilder(container.get().getServiceRoot())
|
||||
.appendEntitySetSegment(resourceName).build().toString()
|
||||
+ String.format(REQUEST_URI_TEMPLATE, lastFetchedDate.get().format(DateTimeFormatter.ISO_INSTANT));
|
||||
|
||||
LOG.info("Making request to: " + requestUri);
|
||||
transportWrapper = container.get().getCommander().executeODataGetRequest(requestUri);
|
||||
|
||||
if (recordsProcessed == 0 && transportWrapper.getResponseData() == null) {
|
||||
numRetries += 1;
|
||||
} else {
|
||||
numRetries = 0;
|
||||
}
|
||||
|
||||
if (numRetries >= 3) {
|
||||
failAndExitWithErrorMessage("Could not fetch records from the " + resourceName + " resource after 3 tries from the given URL: " + requestUri, scenario);
|
||||
}
|
||||
|
||||
if (transportWrapper.getHttpResponseCode() != null && transportWrapper.getHttpResponseCode() != 200) {
|
||||
getDefaultErrorMessage("Request to", requestUri, "failed with response code", transportWrapper.getHttpResponseCode().toString());
|
||||
break;
|
||||
} else {
|
||||
//need to look at the records from the response and get the lowest timestamp
|
||||
try {
|
||||
final URI uri = Commander.prepareURI(container.get().getCommander().getClient().newURIBuilder(
|
||||
container.get().getServiceRoot()).appendEntitySetSegment(resourceName).build().toString());
|
||||
|
||||
container.get().setRequestUri(uri);
|
||||
|
||||
assertNotNull(getDefaultErrorMessage("Request URI was null!"), uri);
|
||||
|
||||
LOG.info("Requesting " + resourceName + " data from: " + uri.toString());
|
||||
container.get().executePreparedRawGetRequest();
|
||||
|
||||
entitySet.set(container.get().getCommander().getClient()
|
||||
entityCollectionResWrap = container.get().getCommander().getClient()
|
||||
.getDeserializer(ContentType.APPLICATION_JSON)
|
||||
.toEntitySet(new ByteArrayInputStream(container.get().getResponseData().getBytes())));
|
||||
.toEntitySet(new ByteArrayInputStream(transportWrapper.getResponseData().getBytes()));
|
||||
|
||||
} catch (Exception exception) {
|
||||
LOG.error(exception);
|
||||
} finally {
|
||||
LOG.info("Time taken: "
|
||||
+ (transportWrapper.getElapsedTimeMillis() >= 1000 ? (transportWrapper.getElapsedTimeMillis() / 1000) + "s" : transportWrapper.getElapsedTimeMillis() + "ms"));
|
||||
|
||||
//assumes the 0th key is always used. TODO: determine if we need to scan the keys.
|
||||
final List<EdmKeyPropertyRef> keyFields = container.get().getEdm().getEntityContainer()
|
||||
.getEntitySet(resourceName).getEntityType().getKeyPropertyRefs();
|
||||
|
||||
LOG.debug("Keys is: " + keyFields.stream().map(EdmKeyPropertyRef::getName).collect(Collectors.joining(", ")));
|
||||
|
||||
assert(keyFields.size() > 0) :
|
||||
assert (keyFields.size() > 0) :
|
||||
getDefaultErrorMessage("no Key Fields found! Resources MUST have at least one key.");
|
||||
|
||||
//we will always key the hash by the first key, the other key fields will still be there
|
||||
//for the MUST requirement checks
|
||||
final String keyField = keyFields.get(0).getName();
|
||||
LOG.info("Hashing " + resourceName + " payload values...");
|
||||
entitySet.get().getPayload().getEntities().forEach(entity -> {
|
||||
entity.getProperties().parallelStream().forEach(property -> {
|
||||
entityCollectionResWrap.getPayload().getEntities().forEach(entity -> {
|
||||
entity.getProperties().forEach(property -> {
|
||||
encodedSamples.get().computeIfAbsent(resourceName, k -> new LinkedHashMap<>());
|
||||
|
||||
if (!encodedSamples.get().get(resourceName).containsKey(keyField)) {
|
||||
encodedSamples.get().get(resourceName).put(entity.getProperty(keyField).getValue().toString(), new LinkedList<>());
|
||||
}
|
||||
|
||||
final Vector<String> fieldMeta = new Vector<>();
|
||||
fieldMeta.setSize(3);
|
||||
fieldMeta.setSize(2);
|
||||
fieldMeta.set(0, property.getName());
|
||||
fieldMeta.set(1, property.getName().contentEquals(MODIFICATION_TIMESTAMP_FIELD) || keyFields.stream().reduce(false,
|
||||
(acc, f) -> acc && f.getName().contentEquals(property.getName()), Boolean::logicalAnd)
|
||||
? property.getValue().toString() : (property.getValue() != null
|
||||
? hashValues(property.getValue().toString()) : null));
|
||||
fieldMeta.set(2, property.getValue() == null ? null :
|
||||
(keyFields.stream().anyMatch(f -> f.getName().contentEquals(property.getName())) ? "primaryKey" : null));
|
||||
|
||||
LOG.info("Adding fieldMeta: " + fieldMeta.toString());
|
||||
|
||||
encodedSamples.get().get(resourceName).get(entity.getProperty(keyField).getValue().toString()).add(fieldMeta);
|
||||
|
||||
if (property.getName().contentEquals(MODIFICATION_TIMESTAMP_FIELD)) {
|
||||
if (ZonedDateTime.parse(property.getValue().toString()).isBefore(lastFetchedDate.get())) {
|
||||
lastFetchedDate.set(ZonedDateTime.parse(property.getValue().toString()));
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
LOG.info("Values encoded!");
|
||||
//createDataAvailabilityReport(encodedSamples);
|
||||
|
||||
recordsProcessed += entityCollectionResWrap.getPayload().getEntities().size();
|
||||
|
||||
LOG.info("Records processed: " + recordsProcessed + " of " + targetRecordCount + "\n");
|
||||
} catch (Exception ex) {
|
||||
getDefaultErrorMessage(ex.toString());
|
||||
}
|
||||
LOG.info("Records Sampled: " + encodedSamples.get().get(resourceName).values().size());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
} while (recordsProcessed < targetRecordCount);
|
||||
return null;
|
||||
}
|
||||
|
||||
public void createDataAvailabilityReport(AtomicReference<Map<String, Map<String, List<Vector<String>>>>> encodedSamples) {
|
||||
|
|
|
@ -112,4 +112,8 @@ public class Utils {
|
|||
return ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT);
|
||||
}
|
||||
|
||||
public static String getIsoTimestamp(ZonedDateTime fromDate) {
|
||||
return ZonedDateTime.from(fromDate.toInstant()).format(DateTimeFormatter.ISO_INSTANT);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue