Avoid DB binary storage deadlock (#2062)
* Avoid DB binary storage deadlock * Add changelog * Rework
This commit is contained in:
parent
b2853567f3
commit
c6777578a8
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 2062
|
||||||
|
title: "A deadlock was fixed where the Database-backed Binary Storage service could lock the system up when running
|
||||||
|
under very high contention."
|
|
@ -222,11 +222,6 @@ public class DaoConfig {
|
||||||
*/
|
*/
|
||||||
private boolean myLastNEnabled = false;
|
private boolean myLastNEnabled = false;
|
||||||
|
|
||||||
/**
|
|
||||||
* @since 5.1.0
|
|
||||||
*/
|
|
||||||
private boolean myPreloadBlobFromInputStream = false;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*/
|
*/
|
||||||
|
@ -2102,28 +2097,11 @@ public class DaoConfig {
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* @since 5.1.0
|
* @since 5.1.0
|
||||||
|
* @deprecated In 5.2.0 this setting no longer does anything
|
||||||
*/
|
*/
|
||||||
public boolean isPreloadBlobFromInputStream() {
|
@Deprecated
|
||||||
return myPreloadBlobFromInputStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* This determines whether $binary-access-write operations should first load the InputStream into memory before persisting the
|
|
||||||
* contents to the database. This needs to be enabled for MS SQL Server as this DB requires that the blob size be known
|
|
||||||
* in advance.
|
|
||||||
* </p>
|
|
||||||
* <p>
|
|
||||||
* Note that this setting should be enabled with caution as it can lead to significant demands on memory.
|
|
||||||
* </p>
|
|
||||||
* <p>
|
|
||||||
* The default value for this setting is {@code false}.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* @since 5.1.0
|
|
||||||
*/
|
|
||||||
public void setPreloadBlobFromInputStream(Boolean thePreloadBlobFromInputStream) {
|
public void setPreloadBlobFromInputStream(Boolean thePreloadBlobFromInputStream) {
|
||||||
myPreloadBlobFromInputStream = thePreloadBlobFromInputStream;
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,8 +60,15 @@ public class DatabaseBlobBinaryStorageSvcImpl extends BaseBinaryStorageSvcImpl {
|
||||||
private DaoConfig myDaoConfig;
|
private DaoConfig myDaoConfig;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional(Transactional.TxType.SUPPORTS)
|
@Transactional(Transactional.TxType.REQUIRED)
|
||||||
public StoredDetails storeBlob(IIdType theResourceId, String theBlobIdOrNull, String theContentType, InputStream theInputStream) throws IOException {
|
public StoredDetails storeBlob(IIdType theResourceId, String theBlobIdOrNull, String theContentType, InputStream theInputStream) throws IOException {
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Note on transactionality: This method used to have a propagation value of SUPPORTS and then do the actual
|
||||||
|
* write in a new transaction.. I don't actually get why that was the original design, but it causes
|
||||||
|
* connection pool deadlocks under load!
|
||||||
|
*/
|
||||||
|
|
||||||
Date publishedDate = new Date();
|
Date publishedDate = new Date();
|
||||||
|
|
||||||
HashingInputStream hashingInputStream = createHashingInputStream(theInputStream);
|
HashingInputStream hashingInputStream = createHashingInputStream(theInputStream);
|
||||||
|
@ -77,32 +84,18 @@ public class DatabaseBlobBinaryStorageSvcImpl extends BaseBinaryStorageSvcImpl {
|
||||||
|
|
||||||
Session session = (Session) myEntityManager.getDelegate();
|
Session session = (Session) myEntityManager.getDelegate();
|
||||||
LobHelper lobHelper = session.getLobHelper();
|
LobHelper lobHelper = session.getLobHelper();
|
||||||
Blob dataBlob;
|
|
||||||
if (myDaoConfig.isPreloadBlobFromInputStream()) {
|
|
||||||
byte[] loadedStream = IOUtils.toByteArray(countingInputStream);
|
byte[] loadedStream = IOUtils.toByteArray(countingInputStream);
|
||||||
dataBlob = lobHelper.createBlob(loadedStream);
|
Blob dataBlob = lobHelper.createBlob(loadedStream);
|
||||||
} else {
|
|
||||||
dataBlob = lobHelper.createBlob(countingInputStream, 0);
|
|
||||||
}
|
|
||||||
entity.setBlob(dataBlob);
|
entity.setBlob(dataBlob);
|
||||||
|
|
||||||
// Save the entity
|
|
||||||
|
|
||||||
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
|
|
||||||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
|
||||||
txTemplate.execute(t -> {
|
|
||||||
myEntityManager.persist(entity);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Update the entity with the final byte count and hash
|
// Update the entity with the final byte count and hash
|
||||||
long bytes = countingInputStream.getCount();
|
long bytes = countingInputStream.getCount();
|
||||||
String hash = hashingInputStream.hash().toString();
|
String hash = hashingInputStream.hash().toString();
|
||||||
txTemplate.execute(t -> {
|
entity.setSize((int) bytes);
|
||||||
myBinaryStorageEntityDao.setSize(id, (int) bytes);
|
entity.setHash(hash);
|
||||||
myBinaryStorageEntityDao.setHash(id, hash);
|
|
||||||
return null;
|
// Save the entity
|
||||||
});
|
myEntityManager.persist(entity);
|
||||||
|
|
||||||
return new StoredDetails()
|
return new StoredDetails()
|
||||||
.setBlobId(id)
|
.setBlobId(id)
|
||||||
|
|
|
@ -30,14 +30,6 @@ import java.util.Optional;
|
||||||
|
|
||||||
public interface IBinaryStorageEntityDao extends JpaRepository<BinaryStorageEntity, String> {
|
public interface IBinaryStorageEntityDao extends JpaRepository<BinaryStorageEntity, String> {
|
||||||
|
|
||||||
@Modifying
|
|
||||||
@Query("UPDATE BinaryStorageEntity e SET e.mySize = :blob_size WHERE e.myBlobId = :blob_id")
|
|
||||||
void setSize(@Param("blob_id") String theId, @Param("blob_size") int theBytes);
|
|
||||||
|
|
||||||
@Modifying
|
|
||||||
@Query("UPDATE BinaryStorageEntity e SET e.myHash = :blob_hash WHERE e.myBlobId = :blob_id")
|
|
||||||
void setHash(@Param("blob_id") String theId, @Param("blob_hash") String theHash);
|
|
||||||
|
|
||||||
@Query("SELECT e FROM BinaryStorageEntity e WHERE e.myBlobId = :blob_id AND e.myResourceId = :resource_id")
|
@Query("SELECT e FROM BinaryStorageEntity e WHERE e.myBlobId = :blob_id AND e.myResourceId = :resource_id")
|
||||||
Optional<BinaryStorageEntity> findByIdAndResourceId(@Param("blob_id") String theBlobId, @Param("resource_id") String theResourceId);
|
Optional<BinaryStorageEntity> findByIdAndResourceId(@Param("blob_id") String theBlobId, @Param("resource_id") String theResourceId);
|
||||||
|
|
||||||
|
|
|
@ -45,18 +45,6 @@ public class DatabaseBlobBinaryStorageSvcImplTest extends BaseJpaR4Test {
|
||||||
@Autowired
|
@Autowired
|
||||||
private DaoConfig myDaoConfig;
|
private DaoConfig myDaoConfig;
|
||||||
|
|
||||||
@BeforeEach
|
|
||||||
public void backupDaoConfig() {
|
|
||||||
defaultPreloadBlobFromInputStream = myDaoConfig.isPreloadBlobFromInputStream();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterEach
|
|
||||||
public void restoreDaoConfig() {
|
|
||||||
myDaoConfig.setPreloadBlobFromInputStream(defaultPreloadBlobFromInputStream);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean defaultPreloadBlobFromInputStream;
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStoreAndRetrieve() throws IOException {
|
public void testStoreAndRetrieve() throws IOException {
|
||||||
|
|
||||||
|
@ -74,7 +62,7 @@ public class DatabaseBlobBinaryStorageSvcImplTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
assertEquals(0, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||||
assertEquals(1, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
assertEquals(1, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
||||||
assertEquals(2, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
||||||
|
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
|
|
||||||
|
@ -128,7 +116,7 @@ public class DatabaseBlobBinaryStorageSvcImplTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
assertEquals(0, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||||
assertEquals(1, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
assertEquals(1, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
||||||
assertEquals(2, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
||||||
|
|
||||||
myCaptureQueriesListener.clear();
|
myCaptureQueriesListener.clear();
|
||||||
|
|
||||||
|
|
|
@ -89,4 +89,12 @@ public class BinaryStorageEntity {
|
||||||
public String getBlobId() {
|
public String getBlobId() {
|
||||||
return myBlobId;
|
return myBlobId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSize(int theSize) {
|
||||||
|
mySize = theSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHash(String theHash) {
|
||||||
|
myHash = theHash;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue