A bit of cleanup around subscription wiring following in-memory matcher

landing
This commit is contained in:
James Agnew 2018-11-20 13:59:31 -05:00
parent 471335341a
commit 7cbad7f4e3
30 changed files with 869 additions and 587 deletions

View File

@ -4,8 +4,13 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.HapiLocalizer; import ca.uhn.fhir.i18n.HapiLocalizer;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider; import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc; import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl; import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
import ca.uhn.fhir.jpa.subscription.email.SubscriptionEmailInterceptor;
import ca.uhn.fhir.jpa.subscription.resthook.SubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.subscription.websocket.SubscriptionWebsocketInterceptor;
import org.hibernate.jpa.HibernatePersistenceProvider; import org.hibernate.jpa.HibernatePersistenceProvider;
import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -34,9 +39,9 @@ import javax.annotation.Nonnull;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -118,6 +123,34 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return new ResourceReindexingSvcImpl(); return new ResourceReindexingSvcImpl();
} }
@Bean
public IStaleSearchDeletingSvc staleSearchDeletingSvc() {
return new StaleSearchDeletingSvcImpl();
}
/**
* Note: If you're going to use this, you need to provide a bean
* of type {@link ca.uhn.fhir.jpa.subscription.email.IEmailSender}
* in your own Spring config
*/
@Bean
@Lazy
public SubscriptionEmailInterceptor subscriptionEmailInterceptor() {
return new SubscriptionEmailInterceptor();
}
@Bean
@Lazy
public SubscriptionRestHookInterceptor subscriptionRestHookInterceptor() {
return new SubscriptionRestHookInterceptor();
}
@Bean
@Lazy
public SubscriptionWebsocketInterceptor subscriptionWebsocketInterceptor() {
return new SubscriptionWebsocketInterceptor();
}
public static void configureEntityManagerFactory(LocalContainerEntityManagerFactoryBean theFactory, FhirContext theCtx) { public static void configureEntityManagerFactory(LocalContainerEntityManagerFactoryBean theFactory, FhirContext theCtx) {
theFactory.setJpaDialect(hibernateJpaDialect(theCtx.getLocalizer())); theFactory.setJpaDialect(hibernateJpaDialect(theCtx.getLocalizer()));
theFactory.setPackagesToScan("ca.uhn.fhir.jpa.entity"); theFactory.setPackagesToScan("ca.uhn.fhir.jpa.entity");

View File

@ -160,6 +160,8 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
protected IResourceTagDao myResourceTagDao; protected IResourceTagDao myResourceTagDao;
@Autowired @Autowired
protected IResourceSearchViewDao myResourceViewDao; protected IResourceSearchViewDao myResourceViewDao;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
@Autowired(required = true) @Autowired(required = true)
private DaoConfig myConfig; private DaoConfig myConfig;
private FhirContext myContext; private FhirContext myContext;
@ -171,8 +173,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
private ISearchParamExtractor mySearchParamExtractor; private ISearchParamExtractor mySearchParamExtractor;
@Autowired @Autowired
private ISearchParamPresenceSvc mySearchParamPresenceSvc; private ISearchParamPresenceSvc mySearchParamPresenceSvc;
@Autowired
protected ISearchParamRegistry mySearchParamRegistry;
//@Autowired //@Autowired
//private ISearchResultDao mySearchResultDao; //private ISearchResultDao mySearchResultDao;
@Autowired @Autowired
@ -188,11 +188,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
private ApplicationContext myApplicationContext; private ApplicationContext myApplicationContext;
public static void clearRequestAsProcessingSubRequest(ServletRequestDetails theRequestDetails) {
if (theRequestDetails != null) {
theRequestDetails.getUserData().remove(PROCESSING_SUB_REQUEST);
}
}
/** /**
* Returns the newly created forced ID. If the entity already had a forced ID, or if * Returns the newly created forced ID. If the entity already had a forced ID, or if
* none was created, returns null. * none was created, returns null.
@ -454,7 +449,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
} }
} }
private void extractTagsHapi(IResource theResource, ResourceTable theEntity, Set<ResourceTag> allDefs) { private void extractTagsHapi(IResource theResource, ResourceTable theEntity, Set<ResourceTag> allDefs) {
TagList tagList = ResourceMetadataKeyEnum.TAG_LIST.get(theResource); TagList tagList = ResourceMetadataKeyEnum.TAG_LIST.get(theResource);
if (tagList != null) { if (tagList != null) {
@ -553,6 +547,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
} }
} }
} }
protected void flushJpaSession() { protected void flushJpaSession() {
SessionImpl session = (SessionImpl) myEntityManager.unwrap(Session.class); SessionImpl session = (SessionImpl) myEntityManager.unwrap(Session.class);
int insertionCount = session.getActionQueue().numberOfInsertions(); int insertionCount = session.getActionQueue().numberOfInsertions();
@ -751,12 +746,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return LogicalReferenceHelper.isLogicalReference(myConfig, theId); return LogicalReferenceHelper.isLogicalReference(myConfig, theId);
} }
public static void markRequestAsProcessingSubRequest(ServletRequestDetails theRequestDetails) {
if (theRequestDetails != null) {
theRequestDetails.getUserData().put(PROCESSING_SUB_REQUEST, Boolean.TRUE);
}
}
@Override @Override
public SearchBuilder newSearchBuilder() { public SearchBuilder newSearchBuilder() {
return beanFactory.getBean(SearchBuilder.class, this); return beanFactory.getBean(SearchBuilder.class, this);
@ -779,33 +768,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
} }
} }
public String parseContentTextIntoWords(IBaseResource theResource) {
StringBuilder retVal = new StringBuilder();
@SuppressWarnings("rawtypes")
List<IPrimitiveType> childElements = getContext().newTerser().getAllPopulatedChildElementsOfType(theResource, IPrimitiveType.class);
for (@SuppressWarnings("rawtypes")
IPrimitiveType nextType : childElements) {
if (nextType instanceof StringDt || nextType.getClass().getSimpleName().equals("StringType")) {
String nextValue = nextType.getValueAsString();
if (isNotBlank(nextValue)) {
retVal.append(nextValue.replace("\n", " ").replace("\r", " "));
retVal.append("\n");
}
}
}
return retVal.toString();
}
public void populateFullTextFields(final IBaseResource theResource, ResourceTable theEntity) {
if (theEntity.getDeleted() != null) {
theEntity.setNarrativeTextParsedIntoWords(null);
theEntity.setContentTextParsedIntoWords(null);
} else {
theEntity.setNarrativeTextParsedIntoWords(parseNarrativeTextIntoWords(theResource));
theEntity.setContentTextParsedIntoWords(parseContentTextIntoWords(theResource));
}
}
private void populateResourceIdFromEntity(IBaseResourceEntity theEntity, final IBaseResource theResource) { private void populateResourceIdFromEntity(IBaseResourceEntity theEntity, final IBaseResource theResource) {
IIdType id = theEntity.getIdDt(); IIdType id = theEntity.getIdDt();
if (getContext().getVersion().getVersion().isRi()) { if (getContext().getVersion().getVersion().isRi()) {
@ -1098,7 +1060,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
} }
} }
/** /**
* This method is called when an update to an existing resource detects that the resource supplied for update is missing a tag/profile/security label that the currently persisted resource holds. * This method is called when an update to an existing resource detects that the resource supplied for update is missing a tag/profile/security label that the currently persisted resource holds.
* <p> * <p>
@ -1312,7 +1273,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
EncodedResource changed; EncodedResource changed;
if (theDeletedTimestampOrNull != null) { if (theDeletedTimestampOrNull != null) {
newParams = new ResourceIndexedSearchParams(); newParams = new ResourceIndexedSearchParams();
theEntity.setDeleted(theDeletedTimestampOrNull); theEntity.setDeleted(theDeletedTimestampOrNull);
@ -1340,10 +1301,10 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
} else { } else {
theEntity.setLanguage(((IAnyResource) theResource).getLanguageElement().getValue()); theEntity.setLanguage(((IAnyResource) theResource).getLanguageElement().getValue());
} }
newParams.setParamsOn(theEntity); newParams.setParamsOn(theEntity);
theEntity.setIndexStatus(INDEX_STATUS_INDEXED); theEntity.setIndexStatus(INDEX_STATUS_INDEXED);
populateFullTextFields(theResource, theEntity); populateFullTextFields(myContext, theResource, theEntity);
} else { } else {
changed = populateResourceIntoEntity(theRequest, theResource, theEntity, false); changed = populateResourceIntoEntity(theRequest, theResource, theEntity, false);
@ -1634,6 +1595,50 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
} }
@Override
public ISearchParamRegistry getSearchParamRegistry() {
return mySearchParamRegistry;
}
public static void clearRequestAsProcessingSubRequest(ServletRequestDetails theRequestDetails) {
if (theRequestDetails != null) {
theRequestDetails.getUserData().remove(PROCESSING_SUB_REQUEST);
}
}
public static void markRequestAsProcessingSubRequest(ServletRequestDetails theRequestDetails) {
if (theRequestDetails != null) {
theRequestDetails.getUserData().put(PROCESSING_SUB_REQUEST, Boolean.TRUE);
}
}
public static String parseContentTextIntoWords(FhirContext theContext, IBaseResource theResource) {
StringBuilder retVal = new StringBuilder();
@SuppressWarnings("rawtypes")
List<IPrimitiveType> childElements = theContext.newTerser().getAllPopulatedChildElementsOfType(theResource, IPrimitiveType.class);
for (@SuppressWarnings("rawtypes")
IPrimitiveType nextType : childElements) {
if (nextType instanceof StringDt || nextType.getClass().getSimpleName().equals("StringType")) {
String nextValue = nextType.getValueAsString();
if (isNotBlank(nextValue)) {
retVal.append(nextValue.replace("\n", " ").replace("\r", " "));
retVal.append("\n");
}
}
}
return retVal.toString();
}
public static void populateFullTextFields(final FhirContext theContext, final IBaseResource theResource, ResourceTable theEntity) {
if (theEntity.getDeleted() != null) {
theEntity.setNarrativeTextParsedIntoWords(null);
theEntity.setContentTextParsedIntoWords(null);
} else {
theEntity.setNarrativeTextParsedIntoWords(parseNarrativeTextIntoWords(theResource));
theEntity.setContentTextParsedIntoWords(parseContentTextIntoWords(theContext, theResource));
}
}
public static String decodeResource(byte[] theResourceBytes, ResourceEncodingEnum theResourceEncoding) { public static String decodeResource(byte[] theResourceBytes, ResourceEncodingEnum theResourceEncoding) {
String resourceText = null; String resourceText = null;
switch (theResourceEncoding) { switch (theResourceEncoding) {
@ -1761,8 +1766,4 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
"Resource with ID " + theEntity.getIdDt().getIdPart() + " exists but it is not of type " + theResourceName + ", found resource of type " + theEntity.getResourceType()); "Resource with ID " + theEntity.getIdDt().getIdPart() + " exists but it is not of type " + theResourceName + ", found resource of type " + theEntity.getResourceType());
} }
} }
public ISearchParamRegistry getSearchParamRegistry() {
return mySearchParamRegistry;
}
} }

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao; package ca.uhn.fhir.jpa.dao;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import java.util.Set; import java.util.Set;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao; package ca.uhn.fhir.jpa.dao;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao; package ca.uhn.fhir.jpa.dao;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IQueryParameterAnd; import ca.uhn.fhir.model.api.IQueryParameterAnd;
import ca.uhn.fhir.model.api.IQueryParameterType; import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;

View File

@ -298,7 +298,7 @@ public class SearchParameterMap extends LinkedHashMap<String, List<List<? extend
* This method creates a URL query string representation of the parameters in this * This method creates a URL query string representation of the parameters in this
* object, excluding the part before the parameters, e.g. * object, excluding the part before the parameters, e.g.
* <p> * <p>
* <code>?name=smith&_sort=Patient:family</code> * <code>?name=smith&amp;_sort=Patient:family</code>
* </p> * </p>
* <p> * <p>
* This method <b>excludes</b> the <code>_count</code> parameter, * This method <b>excludes</b> the <code>_count</code> parameter,

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao.index; package ca.uhn.fhir.jpa.dao.index;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao; import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.entity.ForcedId; import ca.uhn.fhir.jpa.entity.ForcedId;

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao.index;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.dao.index; package ca.uhn.fhir.jpa.dao.index;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
@ -272,10 +292,6 @@ public class SearchParamExtractorService {
} }
} }
/**
* @return Returns a set containing all of the parameter names that
* were found to have a value
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void extractResourceLinks(ResourceIndexedSearchParams theParams, ResourceTable theEntity, IBaseResource theResource, Date theUpdateTime, boolean lookUpReferencesInDatabase) { public void extractResourceLinks(ResourceIndexedSearchParams theParams, ResourceTable theEntity, IBaseResource theResource, Date theUpdateTime, boolean lookUpReferencesInDatabase) {
String resourceType = theEntity.getResourceType(); String resourceType = theEntity.getResourceType();

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.entity;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -21,135 +21,36 @@ package ca.uhn.fhir.jpa.provider;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.subscription.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.dao.MatchUrlService;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.util.JpaConstants; import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam; import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.UriParam; import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.ValidateUtil;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.instance.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PostConstruct; import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class SubscriptionTriggeringProvider implements IResourceProvider, ApplicationContextAware {
public class SubscriptionTriggeringProvider implements IResourceProvider {
public static final String RESOURCE_ID = "resourceId"; public static final String RESOURCE_ID = "resourceId";
public static final int DEFAULT_MAX_SUBMIT = 10000;
public static final String SEARCH_URL = "searchUrl"; public static final String SEARCH_URL = "searchUrl";
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
@Autowired @Autowired
private FhirContext myFhirContext; private FhirContext myFhirContext;
@Autowired @Autowired
private DaoRegistry myDaoRegistry; private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList;
private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
@Autowired
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
@Autowired
private MatchUrlService myMatchUrlService;
private ApplicationContext myAppCtx;
private ExecutorService myExecutorService;
/**
* Sets the maximum number of resources that will be submitted in a single pass
*/
public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
Integer maxSubmitPerPass = theMaxSubmitPerPass;
if (maxSubmitPerPass == null) {
maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
}
Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
myMaxSubmitPerPass = maxSubmitPerPass;
}
@SuppressWarnings("unchecked")
@PostConstruct
public void start() {
mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList());
mySubscriptionInterceptorList = new ArrayList<>();
Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
mySubscriptionInterceptorList.addAll(values);
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("SubscriptionTriggering-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
myExecutorService = new ThreadPoolExecutor(
0,
10,
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
}
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
public IBaseParameters triggerSubscription( public IBaseParameters triggerSubscription(
@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds, @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
) { ) {
return doTriggerSubscription(theResourceIds, theSearchUrls, null); return mySubscriptionTriggeringSvc.triggerSubscription(theResourceIds, theSearchUrls, null);
} }
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
@ -158,331 +59,13 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds, @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
) { ) {
return mySubscriptionTriggeringSvc.triggerSubscription(theResourceIds, theSearchUrls, theSubscriptionId);
// Throw a 404 if the subscription doesn't exist
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
IIdType subscriptionId = theSubscriptionId;
if (subscriptionId.hasResourceType() == false) {
subscriptionId = subscriptionId.withResourceType("Subscription");
}
subscriptionDao.read(subscriptionId);
return doTriggerSubscription(theResourceIds, theSearchUrls, subscriptionId);
} }
private IBaseParameters doTriggerSubscription(@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds, @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId) {
if (mySubscriptionInterceptorList.isEmpty()) {
throw new PreconditionFailedException("Subscription processing not active on this server");
}
List<UriParam> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
List<StringParam> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
// Make sure we have at least one resource ID or search URL
if (resourceIds.size() == 0 && searchUrls.size() == 0) {
throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
}
// Resource URLs must be compete
for (UriParam next : resourceIds) {
IdType resourceId = new IdType(next.getValue());
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
}
// Search URLs must be valid
for (StringParam next : searchUrls) {
if (next.getValue().contains("?") == false) {
throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
}
}
SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
jobDetails.setJobId(UUID.randomUUID().toString());
jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
if (theSubscriptionId != null) {
jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
}
// Submit job for processing
synchronized (myActiveJobs) {
myActiveJobs.add(jobDetails);
}
ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {}", resourceIds.size(), searchUrls.size(), jobDetails.getJobId());
// Create a parameters response
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
return retVal;
}
@Override @Override
public Class<? extends IBaseResource> getResourceType() { public Class<? extends IBaseResource> getResourceType() {
return myFhirContext.getResourceDefinition("Subscription").getImplementingClass(); return myFhirContext.getResourceDefinition("Subscription").getImplementingClass();
} }
@Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND)
public void runDeliveryPass() {
synchronized (myActiveJobs) {
if (myActiveJobs.isEmpty()) {
return;
}
String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", "));
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
runJob(activeJob);
// If the job is complete, remove it from the queue
if (activeJob.getRemainingResourceIds().isEmpty()) {
if (activeJob.getRemainingSearchUrls().isEmpty()) {
if (isBlank(activeJob.myCurrentSearchUuid)) {
myActiveJobs.remove(0);
String remainingJobsMsg = "";
if (myActiveJobs.size() > 0) {
remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
}
ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
}
}
}
}
}
private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
StopWatch sw = new StopWatch();
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
// Submit individual resources
int totalSubmitted = 0;
List<Pair<String, Future<Void>>> futures = new ArrayList<>();
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
totalSubmitted++;
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
futures.add(Pair.of(nextResourceId, future));
}
// Make sure these all succeeded in submitting
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
// If we don't have an active search started, and one needs to be.. start it
if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl);
String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
String resourceType = resourceDef.getName();
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective());
theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount());
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
}
// If we have an active search going, submit resources from it
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
int toIndex = fromIndex + maxQuerySize;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
for (Long next : resourceIds) {
IBaseResource nextResource = resourceDao.readByPid(next);
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted++;
highestIndexSubmitted++;
}
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
}
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
for (Pair<String, Future<Void>> next : theIdToFutures) {
String nextDeliveredId = next.getKey();
try {
Future<Void> nextFuture = next.getValue();
nextFuture.get();
ourLog.info("Finished redelivering {}", nextDeliveredId);
} catch (Exception e) {
ourLog.error("Failure triggering resource " + nextDeliveredId, e);
return true;
}
}
// Clear the list since it will potentially get reused
theIdToFutures.clear();
return false;
}
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId);
return submitResource(theSubscriptionId, resourceToTrigger);
}
private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theResourceToTrigger.getIdElement());
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
msg.setNewPayload(myFhirContext, theResourceToTrigger);
return myExecutorService.submit(()->{
for (int i = 0; ; i++) {
try {
for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
next.submitResourceModified(msg);
}
break;
} catch (Exception e) {
if (i >= 3) {
throw new InternalErrorException(e);
}
ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
Thread.sleep(1000);
}
}
return null;
});
}
public void cancelAll() {
synchronized (myActiveJobs) {
myActiveJobs.clear();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
myAppCtx = applicationContext;
}
private static class SubscriptionTriggeringJobDetails {
private String myJobId;
private String mySubscriptionId;
private List<String> myRemainingResourceIds;
private List<String> myRemainingSearchUrls;
private String myCurrentSearchUuid;
private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType;
private int myCurrentSearchLastUploadedIndex;
public Integer getCurrentSearchCount() {
return myCurrentSearchCount;
}
public void setCurrentSearchCount(Integer theCurrentSearchCount) {
myCurrentSearchCount = theCurrentSearchCount;
}
public String getCurrentSearchResourceType() {
return myCurrentSearchResourceType;
}
public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
myCurrentSearchResourceType = theCurrentSearchResourceType;
}
public String getJobId() {
return myJobId;
}
public void setJobId(String theJobId) {
myJobId = theJobId;
}
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
public List<String> getRemainingResourceIds() {
return myRemainingResourceIds;
}
public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
myRemainingResourceIds = theRemainingResourceIds;
}
public List<String> getRemainingSearchUrls() {
return myRemainingSearchUrls;
}
public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
myRemainingSearchUrls = theRemainingSearchUrls;
}
public String getCurrentSearchUuid() {
return myCurrentSearchUuid;
}
public void setCurrentSearchUuid(String theCurrentSearchUuid) {
myCurrentSearchUuid = theCurrentSearchUuid;
}
public int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex;
}
public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
}
}
} }

View File

@ -19,22 +19,23 @@ package ca.uhn.fhir.jpa.provider.dstu3;
* limitations under the License. * limitations under the License.
* #L% * #L%
*/ */
import java.util.*;
import javax.servlet.http.HttpServletRequest;
import ca.uhn.fhir.jpa.dao.ISearchParamRegistry;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.dstu3.model.CapabilityStatement.*;
import org.hl7.fhir.dstu3.model.Enumerations.SearchParamType;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.IFhirSystemDao; import ca.uhn.fhir.jpa.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.dao.ISearchParamRegistry;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.util.CoverageIgnore; import ca.uhn.fhir.util.CoverageIgnore;
import ca.uhn.fhir.util.ExtensionConstants; import ca.uhn.fhir.util.ExtensionConstants;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.dstu3.model.CapabilityStatement.*;
import org.hl7.fhir.dstu3.model.Enumerations.SearchParamType;
import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
@ -47,17 +48,16 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
private boolean myIncludeResourceCounts; private boolean myIncludeResourceCounts;
private RestfulServer myRestfulServer; private RestfulServer myRestfulServer;
private IFhirSystemDao<Bundle, Meta> mySystemDao; private IFhirSystemDao<Bundle, Meta> mySystemDao;
/** /**
* Constructor * Constructor
*/ */
@CoverageIgnore @CoverageIgnore
public JpaConformanceProviderDstu3(){ public JpaConformanceProviderDstu3() {
super(); super();
super.setCache(false); super.setCache(false);
setIncludeResourceCounts(true); setIncludeResourceCounts(true);
} }
/** /**
* Constructor * Constructor
*/ */
@ -66,11 +66,15 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
myRestfulServer = theRestfulServer; myRestfulServer = theRestfulServer;
mySystemDao = theSystemDao; mySystemDao = theSystemDao;
myDaoConfig = theDaoConfig; myDaoConfig = theDaoConfig;
mySearchParamRegistry = theSystemDao.getSearchParamRegistry();
super.setCache(false); super.setCache(false);
setSearchParamRegistry(theSystemDao.getSearchParamRegistry());
setIncludeResourceCounts(true); setIncludeResourceCounts(true);
} }
public void setSearchParamRegistry(ISearchParamRegistry theSearchParamRegistry) {
mySearchParamRegistry = theSearchParamRegistry;
}
@Override @Override
public CapabilityStatement getServerConformance(HttpServletRequest theRequest) { public CapabilityStatement getServerConformance(HttpServletRequest theRequest) {
CapabilityStatement retVal = myCachedValue; CapabilityStatement retVal = myCachedValue;
@ -87,7 +91,7 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
for (CapabilityStatementRestResourceComponent nextResource : nextRest.getResource()) { for (CapabilityStatementRestResourceComponent nextResource : nextRest.getResource()) {
nextResource.setVersioning(ResourceVersionPolicy.VERSIONEDUPDATE); nextResource.setVersioning(ResourceVersionPolicy.VERSIONEDUPDATE);
ConditionalDeleteStatus conditionalDelete = nextResource.getConditionalDelete(); ConditionalDeleteStatus conditionalDelete = nextResource.getConditionalDelete();
if (conditionalDelete == ConditionalDeleteStatus.MULTIPLE && myDaoConfig.isAllowMultipleDelete() == false) { if (conditionalDelete == ConditionalDeleteStatus.MULTIPLE && myDaoConfig.isAllowMultipleDelete() == false) {
nextResource.setConditionalDelete(ConditionalDeleteStatus.SINGLE); nextResource.setConditionalDelete(ConditionalDeleteStatus.SINGLE);
@ -110,42 +114,42 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
confSp.setDocumentation(runtimeSp.getDescription()); confSp.setDocumentation(runtimeSp.getDescription());
confSp.setDefinition(runtimeSp.getUri()); confSp.setDefinition(runtimeSp.getUri());
switch (runtimeSp.getParamType()) { switch (runtimeSp.getParamType()) {
case COMPOSITE: case COMPOSITE:
confSp.setType(SearchParamType.COMPOSITE); confSp.setType(SearchParamType.COMPOSITE);
break; break;
case DATE: case DATE:
confSp.setType(SearchParamType.DATE); confSp.setType(SearchParamType.DATE);
break; break;
case NUMBER: case NUMBER:
confSp.setType(SearchParamType.NUMBER); confSp.setType(SearchParamType.NUMBER);
break; break;
case QUANTITY: case QUANTITY:
confSp.setType(SearchParamType.QUANTITY); confSp.setType(SearchParamType.QUANTITY);
break; break;
case REFERENCE: case REFERENCE:
confSp.setType(SearchParamType.REFERENCE); confSp.setType(SearchParamType.REFERENCE);
break; break;
case STRING: case STRING:
confSp.setType(SearchParamType.STRING); confSp.setType(SearchParamType.STRING);
break; break;
case TOKEN: case TOKEN:
confSp.setType(SearchParamType.TOKEN); confSp.setType(SearchParamType.TOKEN);
break; break;
case URI: case URI:
confSp.setType(SearchParamType.URI); confSp.setType(SearchParamType.URI);
break; break;
case HAS: case HAS:
// Shouldn't happen // Shouldn't happen
break; break;
} }
} }
} }
} }
massage(retVal); massage(retVal);
retVal.getImplementation().setDescription(myImplementationDescription); retVal.getImplementation().setDescription(myImplementationDescription);
myCachedValue = retVal; myCachedValue = retVal;
return retVal; return retVal;
@ -154,7 +158,11 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
public boolean isIncludeResourceCounts() { public boolean isIncludeResourceCounts() {
return myIncludeResourceCounts; return myIncludeResourceCounts;
} }
public void setIncludeResourceCounts(boolean theIncludeResourceCounts) {
myIncludeResourceCounts = theIncludeResourceCounts;
}
/** /**
* Subclasses may override * Subclasses may override
*/ */
@ -171,10 +179,6 @@ public class JpaConformanceProviderDstu3 extends org.hl7.fhir.dstu3.hapi.rest.se
myImplementationDescription = theImplDesc; myImplementationDescription = theImplDesc;
} }
public void setIncludeResourceCounts(boolean theIncludeResourceCounts) {
myIncludeResourceCounts = theIncludeResourceCounts;
}
@Override @Override
public void setRestfulServer(RestfulServer theRestfulServer) { public void setRestfulServer(RestfulServer theRestfulServer) {
this.myRestfulServer = theRestfulServer; this.myRestfulServer = theRestfulServer;

View File

@ -66,9 +66,13 @@ public class JpaConformanceProviderR4 extends org.hl7.fhir.r4.hapi.rest.server.S
myRestfulServer = theRestfulServer; myRestfulServer = theRestfulServer;
mySystemDao = theSystemDao; mySystemDao = theSystemDao;
myDaoConfig = theDaoConfig; myDaoConfig = theDaoConfig;
mySearchParamRegistry = theSystemDao.getSearchParamRegistry();
super.setCache(false); super.setCache(false);
setIncludeResourceCounts(true); setIncludeResourceCounts(true);
setSearchParamRegistry(theSystemDao.getSearchParamRegistry());
}
public void setSearchParamRegistry(ISearchParamRegistry theSearchParamRegistry) {
mySearchParamRegistry = theSearchParamRegistry;
} }
@Override @Override

View File

@ -44,7 +44,11 @@ import java.util.Date;
/** /**
* Deletes old searches * Deletes old searches
*/ */
@Service //
// NOTE: This is not a @Service because we manually instantiate
// it in BaseConfig. This is so that we can override the definition
// in Smile.
//
public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc { public class StaleSearchDeletingSvcImpl implements IStaleSearchDeletingSvc {
public static final long DEFAULT_CUTOFF_SLACK = 10 * DateUtils.MILLIS_PER_SECOND; public static final long DEFAULT_CUTOFF_SLACK = 10 * DateUtils.MILLIS_PER_SECOND;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StaleSearchDeletingSvcImpl.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StaleSearchDeletingSvcImpl.class);

View File

@ -68,9 +68,9 @@ import java.util.concurrent.*;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -25,20 +25,16 @@ import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
public abstract class BaseSubscriptionSubscriber implements MessageHandler { public abstract class BaseSubscriptionSubscriber implements MessageHandler {
@Autowired
DaoRegistry myDaoRegistry;
private final Subscription.SubscriptionChannelType myChannelType; private final Subscription.SubscriptionChannelType myChannelType;
private final BaseSubscriptionInterceptor mySubscriptionInterceptor; private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
@Autowired
DaoRegistry myDaoRegistry;
private IFhirResourceDao<?> mySubscriptionDao; private IFhirResourceDao<?> mySubscriptionDao;
/** /**
@ -49,6 +45,11 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
mySubscriptionInterceptor = theSubscriptionInterceptor; mySubscriptionInterceptor = theSubscriptionInterceptor;
} }
@SuppressWarnings("unused") // Don't delete, used in Smile
public void setDaoRegistry(DaoRegistry theDaoRegistry) {
myDaoRegistry = theDaoRegistry;
}
@PostConstruct @PostConstruct
public void setSubscriptionDao() { public void setSubscriptionDao() {
mySubscriptionDao = myDaoRegistry.getResourceDao("Subscription"); mySubscriptionDao = myDaoRegistry.getResourceDao("Subscription");
@ -85,7 +86,7 @@ public abstract class BaseSubscriptionSubscriber implements MessageHandler {
*/ */
static boolean subscriptionTypeApplies(String theSubscriptionChannelTypeCode, Subscription.SubscriptionChannelType theChannelType) { static boolean subscriptionTypeApplies(String theSubscriptionChannelTypeCode, Subscription.SubscriptionChannelType theChannelType) {
boolean subscriptionTypeApplies = false; boolean subscriptionTypeApplies = false;
if (theSubscriptionChannelTypeCode != null) { if (theSubscriptionChannelTypeCode != null) {
if (theChannelType.toCode().equals(theSubscriptionChannelTypeCode)) { if (theChannelType.toCode().equals(theSubscriptionChannelTypeCode)) {
subscriptionTypeApplies = true; subscriptionTypeApplies = true;
} }

View File

@ -0,0 +1,33 @@
package ca.uhn.fhir.jpa.subscription;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.UriParam;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IIdType;
import java.util.List;
public interface ISubscriptionTriggeringSvc {
IBaseParameters triggerSubscription(List<UriParam> theResourceIds, List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId);
}

View File

@ -37,6 +37,8 @@ public class ResourceDeliveryMessage {
private transient CanonicalSubscription mySubscription; private transient CanonicalSubscription mySubscription;
@JsonProperty("subscription") @JsonProperty("subscription")
private String mySubscriptionString; private String mySubscriptionString;
@JsonProperty("payload")
private String myPayloadString;
@JsonIgnore @JsonIgnore
private transient IBaseResource myPayload; private transient IBaseResource myPayload;
@JsonProperty("payloadId") @JsonProperty("payloadId")
@ -60,8 +62,13 @@ public class ResourceDeliveryMessage {
} }
public IBaseResource getPayload(FhirContext theCtx) { public IBaseResource getPayload(FhirContext theCtx) {
Validate.notNull(myPayload); Validate.notNull(myPayloadString);
return myPayload; IBaseResource retVal = myPayload;
if (retVal == null) {
retVal = theCtx.newJsonParser().parseResource(myPayloadString);
myPayload = retVal;
}
return retVal;
} }
public IIdType getPayloadId(FhirContext theCtx) { public IIdType getPayloadId(FhirContext theCtx) {
@ -88,6 +95,7 @@ public class ResourceDeliveryMessage {
public void setPayload(FhirContext theCtx, IBaseResource thePayload) { public void setPayload(FhirContext theCtx, IBaseResource thePayload) {
myPayload = thePayload; myPayload = thePayload;
myPayloadString = theCtx.newJsonParser().encodeResourceToString(thePayload);
} }
public void setPayloadId(IIdType thePayloadId) { public void setPayloadId(IIdType thePayloadId) {

View File

@ -34,9 +34,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -0,0 +1,463 @@
package ca.uhn.fhir.jpa.subscription;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.MatchUrlService;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.ValidateUtil;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.instance.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider.RESOURCE_ID;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Service
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, ApplicationContextAware {
public static final int DEFAULT_MAX_SUBMIT = 10000;
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
@Autowired
private FhirContext myFhirContext;
@Autowired
private DaoRegistry myDaoRegistry;
private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList;
private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
@Autowired
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
@Autowired
private MatchUrlService myMatchUrlService;
private ApplicationContext myAppCtx;
private ExecutorService myExecutorService;
@Override
public IBaseParameters triggerSubscription(List<UriParam> theResourceIds, List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId) {
if (mySubscriptionInterceptorList.isEmpty()) {
throw new PreconditionFailedException("Subscription processing not active on this server");
}
// Throw a 404 if the subscription doesn't exist
if (theSubscriptionId != null) {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
IIdType subscriptionId = theSubscriptionId;
if (subscriptionId.hasResourceType() == false) {
subscriptionId = subscriptionId.withResourceType("Subscription");
}
subscriptionDao.read(subscriptionId);
}
List<UriParam> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
List<StringParam> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
// Make sure we have at least one resource ID or search URL
if (resourceIds.size() == 0 && searchUrls.size() == 0) {
throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
}
// Resource URLs must be compete
for (UriParam next : resourceIds) {
IdType resourceId = new IdType(next.getValue());
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
}
// Search URLs must be valid
for (StringParam next : searchUrls) {
if (next.getValue().contains("?") == false) {
throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
}
}
SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
jobDetails.setJobId(UUID.randomUUID().toString());
jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
if (theSubscriptionId != null) {
jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
}
// Submit job for processing
synchronized (myActiveJobs) {
myActiveJobs.add(jobDetails);
}
ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {}", resourceIds.size(), searchUrls.size(), jobDetails.getJobId());
// Create a parameters response
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
return retVal;
}
@Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND)
public void runDeliveryPass() {
synchronized (myActiveJobs) {
if (myActiveJobs.isEmpty()) {
return;
}
String activeJobIds = myActiveJobs.stream().map(t -> t.getJobId()).collect(Collectors.joining(", "));
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
runJob(activeJob);
// If the job is complete, remove it from the queue
if (activeJob.getRemainingResourceIds().isEmpty()) {
if (activeJob.getRemainingSearchUrls().isEmpty()) {
if (isBlank(activeJob.myCurrentSearchUuid)) {
myActiveJobs.remove(0);
String remainingJobsMsg = "";
if (myActiveJobs.size() > 0) {
remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
}
ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
}
}
}
}
}
private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
StopWatch sw = new StopWatch();
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
// Submit individual resources
int totalSubmitted = 0;
List<Pair<String, Future<Void>>> futures = new ArrayList<>();
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
totalSubmitted++;
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
futures.add(Pair.of(nextResourceId, future));
}
// Make sure these all succeeded in submitting
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
// If we don't have an active search started, and one needs to be.. start it
if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl);
String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
String resourceType = resourceDef.getName();
IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective());
theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount());
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
}
// If we have an active search going, submit resources from it
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
int toIndex = fromIndex + maxQuerySize;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
for (Long next : resourceIds) {
IBaseResource nextResource = resourceDao.readByPid(next);
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted++;
highestIndexSubmitted++;
}
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
}
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
for (Pair<String, Future<Void>> next : theIdToFutures) {
String nextDeliveredId = next.getKey();
try {
Future<Void> nextFuture = next.getValue();
nextFuture.get();
ourLog.info("Finished redelivering {}", nextDeliveredId);
} catch (Exception e) {
ourLog.error("Failure triggering resource " + nextDeliveredId, e);
return true;
}
}
// Clear the list since it will potentially get reused
theIdToFutures.clear();
return false;
}
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId);
return submitResource(theSubscriptionId, resourceToTrigger);
}
private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theResourceToTrigger.getIdElement());
msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
msg.setNewPayload(myFhirContext, theResourceToTrigger);
return myExecutorService.submit(() -> {
for (int i = 0; ; i++) {
try {
for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
next.submitResourceModified(msg);
}
break;
} catch (Exception e) {
if (i >= 3) {
throw new InternalErrorException(e);
}
ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
Thread.sleep(1000);
}
}
return null;
});
}
public void cancelAll() {
synchronized (myActiveJobs) {
myActiveJobs.clear();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
myAppCtx = applicationContext;
}
/**
* Sets the maximum number of resources that will be submitted in a single pass
*/
public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
Integer maxSubmitPerPass = theMaxSubmitPerPass;
if (maxSubmitPerPass == null) {
maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
}
Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
myMaxSubmitPerPass = maxSubmitPerPass;
}
@SuppressWarnings("unchecked")
@PostConstruct
public void start() {
mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList());
mySubscriptionInterceptorList = new ArrayList<>();
Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
mySubscriptionInterceptorList.addAll(values);
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("SubscriptionTriggering-%d")
.daemon(false)
.priority(Thread.NORM_PRIORITY)
.build();
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
StopWatch sw = new StopWatch();
try {
executorQueue.put(theRunnable);
} catch (InterruptedException theE) {
throw new RejectedExecutionException("Task " + theRunnable.toString() +
" rejected from " + theE.toString());
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
}
};
myExecutorService = new ThreadPoolExecutor(
0,
10,
0L,
TimeUnit.MILLISECONDS,
executorQueue,
threadFactory,
rejectedExecutionHandler);
}
private static class SubscriptionTriggeringJobDetails {
private String myJobId;
private String mySubscriptionId;
private List<String> myRemainingResourceIds;
private List<String> myRemainingSearchUrls;
private String myCurrentSearchUuid;
private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType;
private int myCurrentSearchLastUploadedIndex;
public Integer getCurrentSearchCount() {
return myCurrentSearchCount;
}
public void setCurrentSearchCount(Integer theCurrentSearchCount) {
myCurrentSearchCount = theCurrentSearchCount;
}
public String getCurrentSearchResourceType() {
return myCurrentSearchResourceType;
}
public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
myCurrentSearchResourceType = theCurrentSearchResourceType;
}
public String getJobId() {
return myJobId;
}
public void setJobId(String theJobId) {
myJobId = theJobId;
}
public String getSubscriptionId() {
return mySubscriptionId;
}
public void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}
public List<String> getRemainingResourceIds() {
return myRemainingResourceIds;
}
public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
myRemainingResourceIds = theRemainingResourceIds;
}
public List<String> getRemainingSearchUrls() {
return myRemainingSearchUrls;
}
public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
myRemainingSearchUrls = theRemainingSearchUrls;
}
public String getCurrentSearchUuid() {
return myCurrentSearchUuid;
}
public void setCurrentSearchUuid(String theCurrentSearchUuid) {
myCurrentSearchUuid = theCurrentSearchUuid;
}
public int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex;
}
public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
}
}
}

View File

@ -36,9 +36,6 @@ import java.util.Optional;
* of type {@link ca.uhn.fhir.jpa.subscription.email.IEmailSender} * of type {@link ca.uhn.fhir.jpa.subscription.email.IEmailSender}
* in your own Spring config * in your own Spring config
*/ */
@Component
@Lazy
public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor { public class SubscriptionEmailInterceptor extends BaseSubscriptionInterceptor {
/** /**

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.subscription.matcher; package ca.uhn.fhir.jpa.subscription.matcher;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam; import ca.uhn.fhir.context.RuntimeSearchParam;

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.subscription.matcher; package ca.uhn.fhir.jpa.subscription.matcher;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public class SubscriptionMatchResult { public class SubscriptionMatchResult {
// This could be an enum, but we may want to include details about unsupported matches in the future // This could be an enum, but we may want to include details about unsupported matches in the future
public static final SubscriptionMatchResult MATCH = new SubscriptionMatchResult(true); public static final SubscriptionMatchResult MATCH = new SubscriptionMatchResult(true);

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.subscription.matcher; package ca.uhn.fhir.jpa.subscription.matcher;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
import org.slf4j.Logger; import org.slf4j.Logger;

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.matcher;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.matcher;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -22,21 +22,13 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
import java.util.Optional; import java.util.Optional;
@Component
@Lazy
public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor { public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRestHookInterceptor.class);
@Autowired @Autowired
BeanFactory myBeanFactory; BeanFactory myBeanFactory;

View File

@ -20,32 +20,15 @@ package ca.uhn.fhir.jpa.subscription.websocket;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.data.ISubscriptionTableDao;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Optional; import java.util.Optional;
@Component
@Lazy
public class SubscriptionWebsocketInterceptor extends BaseSubscriptionInterceptor { public class SubscriptionWebsocketInterceptor extends BaseSubscriptionInterceptor {
@Autowired
private ISubscriptionTableDao mySubscriptionTableDao;
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
private IResourceTableDao myResourceTableDao;
@Override @Override
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) { protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
return Optional.empty(); return Optional.empty();

View File

@ -0,0 +1,19 @@
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

View File

@ -10,7 +10,6 @@ import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update; import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer; import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
@ -23,6 +22,7 @@ import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.*; import org.junit.*;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList; import java.util.ArrayList;
@ -30,9 +30,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** /**
* Test the rest-hook subscriptions * Test the rest-hook subscriptions
@ -70,12 +68,15 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor); ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor);
ourSubscriptionTriggeringProvider.cancelAll(); mySubscriptionTriggeringSvc.cancelAll();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null); mySubscriptionTriggeringSvc.setMaxSubmitPerPass(null);
myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds()); myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds());
} }
@Autowired
private SubscriptionTriggeringSvcImpl mySubscriptionTriggeringSvc;
@Before @Before
public void beforeRegisterRestHookListener() { public void beforeRegisterRestHookListener() {
ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor); ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor);
@ -196,7 +197,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
waitForSize(50, ourUpdatedPatients); waitForSize(50, ourUpdatedPatients);
beforeReset(); beforeReset();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); mySubscriptionTriggeringSvc.setMaxSubmitPerPass(33);
Parameters response = ourClient Parameters response = ourClient
.operation() .operation()
@ -252,7 +253,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
waitForSize(50, ourUpdatedPatients); waitForSize(50, ourUpdatedPatients);
beforeReset(); beforeReset();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); mySubscriptionTriggeringSvc.setMaxSubmitPerPass(33);
Parameters response = ourClient Parameters response = ourClient
.operation() .operation()
@ -315,7 +316,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
waitForSize(0, ourUpdatedPatients); waitForSize(0, ourUpdatedPatients);
beforeReset(); beforeReset();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(50); mySubscriptionTriggeringSvc.setMaxSubmitPerPass(50);
Parameters response = ourClient Parameters response = ourClient
.operation() .operation()