Add transaction write semaphore interceptor (#2744)

* Add transaction write semaphore interceptor

* Add changelog

* Test fix

* Two missing commits on TransactionConcurrencySemaphoreInterceptor

* License header
This commit is contained in:
James Agnew 2021-06-21 05:38:53 -04:00 committed by GitHub
parent f0cfb5ad1b
commit 0d0a0fd6a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 14 deletions

View File

@ -33,9 +33,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* This interceptor uses semaphores to avoid multiple concurrent FHIR transaction
@ -49,6 +53,8 @@ public class TransactionConcurrencySemaphoreInterceptor {
private static final String HELD_SEMAPHORES = TransactionConcurrencySemaphoreInterceptor.class.getName() + "_HELD_SEMAPHORES";
private final Cache<String, Semaphore> mySemaphoreCache;
private final MemoryCacheService myMemoryCacheService;
private boolean myLogWaits;
private final Semaphore myLockingSemaphore = new Semaphore(1);
/**
* Constructor
@ -61,39 +67,89 @@ public class TransactionConcurrencySemaphoreInterceptor {
.build();
}
/**
* Should the interceptor log if a wait for a semaphore is required
*/
public boolean isLogWaits() {
return myLogWaits;
}
/**
* Should the interceptor log if a wait for a semaphore is required
*/
public void setLogWaits(boolean theLogWaits) {
myLogWaits = theLogWaits;
}
@Hook(Pointcut.STORAGE_TRANSACTION_WRITE_OPERATIONS_PRE)
public void pre(TransactionDetails theTransactionDetails, TransactionWriteOperationsDetails theWriteOperationsDetails) {
List<Semaphore> heldSemaphores = new ArrayList<>();
Map<String, Semaphore> pendingAndHeldSemaphores = new HashMap<>();
acquireSemaphoresForUrlList(heldSemaphores, theWriteOperationsDetails.getUpdateRequestUrls(), false);
acquireSemaphoresForUrlList(heldSemaphores, theWriteOperationsDetails.getConditionalCreateRequestUrls(), true);
AtomicBoolean locked = new AtomicBoolean(false);
try {
acquireSemaphoresForUrlList(locked, heldSemaphores, pendingAndHeldSemaphores, theWriteOperationsDetails.getUpdateRequestUrls(), false);
acquireSemaphoresForUrlList(locked, heldSemaphores, pendingAndHeldSemaphores, theWriteOperationsDetails.getConditionalCreateRequestUrls(), true);
pendingAndHeldSemaphores.keySet().removeIf(k -> pendingAndHeldSemaphores.get(k) == null);
if (!pendingAndHeldSemaphores.isEmpty()) {
if (isLogWaits()) {
ourLog.info("Waiting to acquire write semaphore for URLs:{}{}",
(pendingAndHeldSemaphores.size() > 1 ? "\n * " : ""),
(pendingAndHeldSemaphores.keySet().stream().sorted().collect(Collectors.joining("\n * "))));
}
for (Map.Entry<String, Semaphore> nextEntry : pendingAndHeldSemaphores.entrySet()) {
Semaphore nextSemaphore = nextEntry.getValue();
try {
if (nextSemaphore.tryAcquire(10, TimeUnit.SECONDS)) {
ourLog.trace("Acquired semaphore {} on request URL: {}", nextSemaphore, nextEntry.getKey());
heldSemaphores.add(nextSemaphore);
} else {
ourLog.warn("Timed out waiting for semaphore {} on request URL: {}", nextSemaphore, nextEntry.getKey());
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
theTransactionDetails.putUserData(HELD_SEMAPHORES, heldSemaphores);
} finally {
if (locked.get()) {
myLockingSemaphore.release();
}
}
}
private void acquireSemaphoresForUrlList(List<Semaphore> heldSemaphores, List<String> urls, boolean isConditionalCreates) {
for (String next : urls) {
private void acquireSemaphoresForUrlList(AtomicBoolean theLocked, List<Semaphore> theHeldSemaphores, Map<String, Semaphore> thePendingAndHeldSemaphores, List<String> urls, boolean isConditionalCreates) {
for (String nextUrl : urls) {
if (isConditionalCreates) {
if (myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.MATCH_URL, next) != null) {
if (myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.MATCH_URL, nextUrl) != null) {
continue;
}
}
Semaphore semaphore = mySemaphoreCache.get(next, t -> new Semaphore(1));
if (heldSemaphores.contains(semaphore)) {
Semaphore semaphore = mySemaphoreCache.get(nextUrl, t -> new Semaphore(1));
if (thePendingAndHeldSemaphores.containsKey(nextUrl)) {
continue;
}
if (!theLocked.get()) {
myLockingSemaphore.acquireUninterruptibly();
theLocked.set(true);
}
assert semaphore != null;
try {
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
ourLog.warn("Timed out waiting for semaphore on request URL: {}", next);
if (semaphore.tryAcquire()) {
ourLog.trace("Acquired semaphore {} on request URL: {}", semaphore, nextUrl);
theHeldSemaphores.add(semaphore);
thePendingAndHeldSemaphores.put(nextUrl, null);
} else {
heldSemaphores.add(semaphore);
}
} catch (InterruptedException e) {
ourLog.warn("Interrupted during semaphore acquisition");
thePendingAndHeldSemaphores.put(nextUrl, semaphore);
}
}
}
@ -102,6 +158,7 @@ public class TransactionConcurrencySemaphoreInterceptor {
public void post(TransactionDetails theTransactionDetails) {
List<Semaphore> heldSemaphores = theTransactionDetails.getUserData(HELD_SEMAPHORES);
for (Semaphore next : heldSemaphores) {
ourLog.trace("Releasing semaphore {}", next);
next.release();
}
}

View File

@ -212,6 +212,7 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
public void testTransactionCreates_WithConcurrencySemaphore_DontLockOnCachedMatchUrlsForConditionalCreate() throws ExecutionException, InterruptedException {
myDaoConfig.setMatchUrlCacheEnabled(true);
myInterceptorRegistry.registerInterceptor(myConcurrencySemaphoreInterceptor);
myConcurrencySemaphoreInterceptor.setLogWaits(true);
Runnable creator = ()->{
BundleBuilder bb = new BundleBuilder(myFhirCtx);