More work on term service importing

This commit is contained in:
jamesagnew 2016-06-19 10:35:27 -04:00
parent a7cbb5c022
commit 36505c60d8
5 changed files with 94 additions and 185 deletions

View File

@ -314,21 +314,6 @@ public abstract class BaseParser implements IParser {
}
}
protected String determineResourceBaseUrl(String bundleBaseUrl, BundleEntry theEntry) {
IResource resource = theEntry.getResource();
if (resource == null) {
return null;
}
String resourceBaseUrl = null;
if (resource.getId() != null && resource.getId().hasBaseUrl()) {
if (!resource.getId().getBaseUrl().equals(bundleBaseUrl)) {
resourceBaseUrl = resource.getId().getBaseUrl();
}
}
return resourceBaseUrl;
}
protected abstract void doEncodeBundleToWriter(Bundle theBundle, Writer theWriter) throws IOException, DataFormatException;
protected abstract void doEncodeResourceToWriter(IBaseResource theResource, Writer theWriter) throws IOException, DataFormatException;

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.term;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/*
@ -24,20 +23,18 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
*/
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -56,18 +53,20 @@ import org.apache.commons.lang3.Validate;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum;
import ca.uhn.fhir.jpa.term.TerminologyLoaderSvc.LoincHierarchyHandler;
import ca.uhn.fhir.jpa.util.Counter;
import ca.uhn.fhir.rest.method.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.CoverageIgnore;
public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
private static final int LOG_INCREMENT = 100000;
public static final String LOINC_FILE = "loinc.csv";
public static final String LOINC_HIERARCHY_FILE = "MULTI-AXIAL_HIERARCHY.CSV";
@ -79,14 +78,7 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
@Autowired
private IHapiTerminologySvc myTermSvc;
private void cleanUpTemporaryFiles(Map<String, File> filenameToFile) {
ourLog.info("Finished terminology file import, cleaning up temporary files");
for (File nextFile : filenameToFile.values()) {
nextFile.delete();
}
}
private void dropCircularRefs(TermConcept theConcept, LinkedHashSet<String> theChain, Map<String, TermConcept> theCode2concept) {
private void dropCircularRefs(TermConcept theConcept, ArrayList<String> theChain, Map<String, TermConcept> theCode2concept, Counter theCircularCounter) {
theChain.add(theConcept.getCode());
for (Iterator<TermConceptParentChildLink> childIter = theConcept.getChildren().iterator(); childIter.hasNext();) {
@ -110,46 +102,25 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
ourLog.info(b.toString(), theConcept.getCode());
childIter.remove();
} else {
dropCircularRefs(nextChild, theChain, theCode2concept);
dropCircularRefs(nextChild, theChain, theCode2concept, theCircularCounter);
}
}
theChain.remove(theConcept.getCode());
theChain.remove(theChain.size() - 1);
}
private Map<String, File> extractFiles(List<byte[]> theZipBytes, List<String> theExpectedFilenameFragments) {
Map<String, File> filenameToFile = new HashMap<String, File>();
private void extractFiles(List<byte[]> theZipBytes, List<String> theExpectedFilenameFragments) {
Set<String> foundFragments = new HashSet<String>();
for (byte[] nextZipBytes : theZipBytes) {
ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new ByteArrayInputStream(nextZipBytes)));
try {
for (ZipEntry nextEntry; (nextEntry = zis.getNextEntry()) != null;) {
ZippedFileInputStream inputStream = new ZippedFileInputStream(zis);
boolean want = false;
for (String next : theExpectedFilenameFragments) {
if (nextEntry.getName().contains(next)) {
want = true;
foundFragments.add(next);
}
}
if (!want) {
ourLog.info("Ignoring zip entry: {}", nextEntry.getName());
continue;
}
ourLog.info("Streaming ZIP entry {} into temporary file", nextEntry.getName());
File nextOutFile = File.createTempFile("hapi_fhir", ".csv");
nextOutFile.deleteOnExit();
OutputStream outputStream = new SinkOutputStream(new FileOutputStream(nextOutFile, false), nextEntry.getName());
try {
IOUtils.copyLarge(inputStream, outputStream);
} finally {
IOUtils.closeQuietly(outputStream);
}
filenameToFile.put(nextEntry.getName(), nextOutFile);
}
} catch (IOException e) {
throw new InternalErrorException(e);
@ -158,10 +129,12 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
}
}
if (filenameToFile.size() != theExpectedFilenameFragments.size()) {
throw new InvalidRequestException("Invalid input zip file, expected zip to contain the following name fragments: " + theExpectedFilenameFragments + " but found: " + filenameToFile.keySet());
for (String next : theExpectedFilenameFragments) {
if (!foundFragments.contains(next)) {
throw new InvalidRequestException("Invalid input zip file, expected zip to contain the following name fragments: " + theExpectedFilenameFragments + " but found: " + foundFragments);
}
return filenameToFile;
}
}
public String firstNonBlank(String... theStrings) {
@ -185,18 +158,24 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
return concept;
}
private void iterateOverZipFile(Map<String, File> theFilenameToFile, String fileNamePart, IRecordHandler handler, char theDelimiter, QuoteMode theQuoteMode) {
private void iterateOverZipFile(List<byte[]> theZipBytes, String fileNamePart, IRecordHandler handler, char theDelimiter, QuoteMode theQuoteMode) {
boolean found = false;
for (Entry<String, File> nextEntry : new HashMap<String, File>(theFilenameToFile).entrySet()) {
if (nextEntry.getKey().contains(fileNamePart)) {
ourLog.info("Processing file {}", nextEntry.getKey());
for (byte[] nextZipBytes : theZipBytes) {
ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new ByteArrayInputStream(nextZipBytes)));
try {
for (ZipEntry nextEntry; (nextEntry = zis.getNextEntry()) != null;) {
ZippedFileInputStream inputStream = new ZippedFileInputStream(zis);
String nextFilename = nextEntry.getName();
if (nextFilename.contains(fileNamePart)) {
ourLog.info("Processing file {}", nextFilename);
found = true;
Reader reader = null;
CSVParser parsed = null;
try {
reader = new BufferedReader(new FileReader(nextEntry.getValue()));
reader = new InputStreamReader(zis, Charsets.UTF_8);
CSVFormat format = CSVFormat.newFormat(theDelimiter).withFirstRecordAsHeader();
if (theQuoteMode != null) {
format = format.withQuote('"').withQuoteMode(theQuoteMode);
@ -206,28 +185,27 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
ourLog.debug("Header map: {}", parsed.getHeaderMap());
int count = 0;
int logIncrement = 100000;
int nextLoggedCount = logIncrement;
int logIncrement = LOG_INCREMENT;
int nextLoggedCount = 0;
while (iter.hasNext()) {
CSVRecord nextRecord = iter.next();
handler.accept(nextRecord);
count++;
if (count >= nextLoggedCount) {
ourLog.info(" * Processed {} records in {}", count, fileNamePart);
ourLog.info(" * Processed {} records in {}", count, nextFilename);
nextLoggedCount += logIncrement;
}
}
ourLog.info("Deleting temporary file: {}", nextEntry.getValue());
nextEntry.getValue().delete();
theFilenameToFile.remove(nextEntry.getKey());
} catch (IOException e) {
throw new InternalErrorException(e);
}
}
}
} catch (IOException e) {
throw new InternalErrorException(e);
} finally {
IOUtils.closeQuietly(parsed);
IOUtils.closeQuietly(reader);
}
IOUtils.closeQuietly(zis);
}
}
@ -239,41 +217,35 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
public UploadStatistics loadLoinc(List<byte[]> theZipBytes, RequestDetails theRequestDetails) {
List<String> expectedFilenameFragments = Arrays.asList(LOINC_FILE, LOINC_HIERARCHY_FILE);
Map<String, File> filenameToFile = extractFiles(theZipBytes, expectedFilenameFragments);
extractFiles(theZipBytes, expectedFilenameFragments);
ourLog.info("Beginning LOINC processing");
try {
return processLoincFiles(filenameToFile, theRequestDetails);
} finally {
cleanUpTemporaryFiles(filenameToFile);
}
return processLoincFiles(theZipBytes, theRequestDetails);
}
@Override
public UploadStatistics loadSnomedCt(List<byte[]> theZipBytes, RequestDetails theRequestDetails) {
List<String> expectedFilenameFragments = Arrays.asList(SCT_FILE_DESCRIPTION, SCT_FILE_RELATIONSHIP, SCT_FILE_CONCEPT);
Map<String, File> filenameToFile = extractFiles(theZipBytes, expectedFilenameFragments);
extractFiles(theZipBytes, expectedFilenameFragments);
ourLog.info("Beginning SNOMED CT processing");
try {
return processSnomedCtFiles(filenameToFile, theRequestDetails);
} finally {
cleanUpTemporaryFiles(filenameToFile);
}
return processSnomedCtFiles(theZipBytes, theRequestDetails);
}
UploadStatistics processLoincFiles(Map<String, File> filenameToFile, RequestDetails theRequestDetails) {
UploadStatistics processLoincFiles(List<byte[]> theZipBytes, RequestDetails theRequestDetails) {
final TermCodeSystemVersion codeSystemVersion = new TermCodeSystemVersion();
final Map<String, TermConcept> code2concept = new HashMap<String, TermConcept>();
IRecordHandler handler = new LoincHandler(codeSystemVersion, code2concept);
iterateOverZipFile(filenameToFile, LOINC_FILE, handler, ',', QuoteMode.NON_NUMERIC);
iterateOverZipFile(theZipBytes, LOINC_FILE, handler, ',', QuoteMode.NON_NUMERIC);
handler = new LoincHierarchyHandler(codeSystemVersion, code2concept);
iterateOverZipFile(filenameToFile, LOINC_HIERARCHY_FILE, handler, ',', QuoteMode.NON_NUMERIC);
iterateOverZipFile(theZipBytes, LOINC_HIERARCHY_FILE, handler, ',', QuoteMode.NON_NUMERIC);
theZipBytes.clear();
for (Iterator<Entry<String, TermConcept>> iter = code2concept.entrySet().iterator(); iter.hasNext();) {
Entry<String, TermConcept> next = iter.next();
@ -295,30 +267,36 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
return new UploadStatistics(code2concept.size());
}
UploadStatistics processSnomedCtFiles(Map<String, File> filenameToFile, RequestDetails theRequestDetails) {
UploadStatistics processSnomedCtFiles(List<byte[]> theZipBytes, RequestDetails theRequestDetails) {
final TermCodeSystemVersion codeSystemVersion = new TermCodeSystemVersion();
final Map<String, TermConcept> id2concept = new HashMap<String, TermConcept>();
final Map<String, TermConcept> code2concept = new HashMap<String, TermConcept>();
final Set<String> validConceptIds = new HashSet<String>();
IRecordHandler handler = new SctHandlerConcept(validConceptIds);
iterateOverZipFile(filenameToFile, SCT_FILE_CONCEPT, handler, '\t', null);
iterateOverZipFile(theZipBytes, SCT_FILE_CONCEPT, handler, '\t', null);
ourLog.info("Have {} valid concept IDs", validConceptIds.size());
handler = new SctHandlerDescription(validConceptIds, code2concept, id2concept, codeSystemVersion);
iterateOverZipFile(filenameToFile, SCT_FILE_DESCRIPTION, handler, '\t', null);
iterateOverZipFile(theZipBytes, SCT_FILE_DESCRIPTION, handler, '\t', null);
ourLog.info("Got {} concepts, cloning map", code2concept.size());
final HashMap<String, TermConcept> rootConcepts = new HashMap<String, TermConcept>(code2concept);
handler = new SctHandlerRelationship(codeSystemVersion, rootConcepts, code2concept);
iterateOverZipFile(filenameToFile, SCT_FILE_RELATIONSHIP, handler, '\t', null);
iterateOverZipFile(theZipBytes, SCT_FILE_RELATIONSHIP, handler, '\t', null);
theZipBytes.clear();
ourLog.info("Done loading SNOMED CT files - {} root codes, {} total codes", rootConcepts.size(), code2concept.size());
Counter circularCounter = new Counter();
for (TermConcept next : rootConcepts.values()) {
dropCircularRefs(next, new LinkedHashSet<String>(), code2concept);
long count = circularCounter.getThenAdd();
float pct = ((float)count / rootConcepts.size()) * 100.0f;
ourLog.info(" * Scanning for circular refs - have scanned {} / {} codes ({}%)", count, rootConcepts.size(), pct);
dropCircularRefs(next, new ArrayList<String>(), code2concept, circularCounter);
}
codeSystemVersion.getConcepts().addAll(rootConcepts.values());
@ -332,20 +310,6 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
myTermSvc = theTermSvc;
}
@CoverageIgnore
public static void main(String[] args) throws Exception {
TerminologyLoaderSvc svc = new TerminologyLoaderSvc();
// byte[] bytes = IOUtils.toByteArray(new FileInputStream("/Users/james/Downloads/SnomedCT_Release_INT_20160131_Full.zip"));
// svc.loadSnomedCt(bytes);
Map<String, File> files = new HashMap<String, File>();
files.put(SCT_FILE_CONCEPT, new File("/Users/james/tmp/sct/SnomedCT_Release_INT_20160131_Full/Terminology/sct2_Concept_Full_INT_20160131.txt"));
files.put(SCT_FILE_DESCRIPTION, new File("/Users/james/tmp/sct/SnomedCT_Release_INT_20160131_Full/Terminology/sct2_Description_Full-en_INT_20160131.txt"));
files.put(SCT_FILE_RELATIONSHIP, new File("/Users/james/tmp/sct/SnomedCT_Release_INT_20160131_Full/Terminology/sct2_Relationship_Full_INT_20160131.txt"));
svc.processSnomedCtFiles(files, null);
}
private interface IRecordHandler {
void accept(CSVRecord theRecord);
}
@ -534,57 +498,6 @@ public class TerminologyLoaderSvc implements IHapiTerminologyLoaderSvc {
}
private static class SinkOutputStream extends OutputStream {
private static final long LOG_INCREMENT = 10 * FileUtils.ONE_MB;
private int myBytes;
private String myFilename;
private long myNextLogCount = LOG_INCREMENT;
private FileOutputStream myWrap;
public SinkOutputStream(FileOutputStream theWrap, String theFilename) {
myWrap = theWrap;
myFilename = theFilename;
}
private void addCount(int theCount) {
myBytes += theCount;
if (myBytes > myNextLogCount) {
ourLog.info(" * Wrote {} of {}", FileUtils.byteCountToDisplaySize(myBytes), myFilename);
myNextLogCount = myBytes + LOG_INCREMENT;
}
}
@Override
public void close() throws IOException {
myWrap.close();
}
@Override
public void flush() throws IOException {
myWrap.flush();
}
@Override
public void write(byte[] theB) throws IOException {
myWrap.write(theB);
addCount(theB.length);
}
@Override
public void write(byte[] theB, int theOff, int theLen) throws IOException {
myWrap.write(theB, theOff, theLen);
addCount(theLen);
}
@Override
public void write(int theB) throws IOException {
myWrap.write(theB);
addCount(1);
}
}
private static class ZippedFileInputStream extends InputStream {
private ZipInputStream is;

View File

@ -0,0 +1,11 @@
package ca.uhn.fhir.jpa.util;
public class Counter {
private long myCount;
public long getThenAdd() {
return myCount++;
}
}

View File

@ -34,7 +34,7 @@ public class TerminologyLoaderSvcIntegrationTest extends BaseJpaDstu3Test {
files.put(TerminologyLoaderSvc.SCT_FILE_CONCEPT, new File("/Users/james/tmp/sct/SnomedCT_Release_INT_20160131_Full/Terminology/sct2_Concept_Full_INT_20160131.txt"));
files.put(TerminologyLoaderSvc.SCT_FILE_DESCRIPTION, new File("/Users/james/tmp/sct/SnomedCT_Release_INT_20160131_Full/Terminology/sct2_Description_Full-en_INT_20160131.txt"));
files.put(TerminologyLoaderSvc.SCT_FILE_RELATIONSHIP, new File("/Users/james/tmp/sct/SnomedCT_Release_INT_20160131_Full/Terminology/sct2_Relationship_Full_INT_20160131.txt"));
myLoader.processSnomedCtFiles(files, mySrd);
// myLoader.processSnomedCtFiles(files, mySrd);
}
}

View File

@ -12,6 +12,7 @@ import com.google.common.reflect.ClassPath.ClassInfo;
import ca.uhn.fhir.model.dstu.resource.OperationOutcome;
import ca.uhn.fhir.rest.client.exceptions.FhirClientConnectionException;
import ca.uhn.fhir.rest.client.exceptions.FhirClientInappropriateForServerException;
import ca.uhn.fhir.util.TestUtil;
public class ExceptionPropertiesTest {
@ -24,9 +25,8 @@ public class ExceptionPropertiesTest {
new FhirClientConnectionException(new Exception());
new NotImplementedOperationException("");
new NotImplementedOperationException(null, new OperationOutcome());
new FhirClientConnectionException("");
new FhirClientConnectionException(new Exception());
new FhirClientConnectionException("", new Exception());
new FhirClientInappropriateForServerException(new Exception());
new FhirClientInappropriateForServerException("", new Exception());
}
@SuppressWarnings("deprecation")