diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java index ed116a381c3..3316ddb5912 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java @@ -240,6 +240,12 @@ public class LevelDBStore implements MetadataStore { } } + @Override + public void flushDB(boolean sync) { + // TODO: Implement flush for level db + // do nothing + } + @Override public void writeBatch(BatchOperation operation) throws IOException { List operations = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java index 7d3bc6ba9a1..ee0f9e8fc8a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java @@ -131,6 +131,12 @@ public interface MetadataStore extends Closeable{ */ void compactDB() throws IOException; + /** + * Flush the outstanding I/O operations of the DB. + * @param sync if true will sync the outstanding I/Os to the disk. + */ + void flushDB(boolean sync) throws IOException; + /** * Destroy the content of the specified database, * a destroyed database will not be able to load again. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java index 2038d840dd4..1be89597078 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java @@ -279,6 +279,19 @@ public class RocksDBStore implements MetadataStore { } } + @Override + public void flushDB(boolean sync) throws IOException { + if (db != null) { + try { + // for RocksDB it is sufficient to flush the WAL as entire db can + // be reconstructed using it. + db.flushWal(sync); + } catch (RocksDBException e) { + throw toIOException("Failed to flush db", e); + } + } + } + private void deleteQuietly(File fileOrDir) { if (fileOrDir != null && fileOrDir.exists()) { try { diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index fd572ada622..9a27a8c89b7 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -144,6 +144,7 @@ enum Result { CONTAINER_NOT_OPEN = 39; CONTAINER_MISSING = 40; BLOCK_TOKEN_VERIFICATION_FAILED = 41; + ERROR_IN_DB_SYNC = 42; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index a818b511ba9..c57e92d21a1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -68,6 +68,8 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.DISK_OUT_OF_SPACE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.ERROR_IN_COMPACT_DB; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .Result.ERROR_IN_DB_SYNC; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.INVALID_CONTAINER_STATE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -298,8 +300,14 @@ public class KeyValueContainer implements Container { @Override public void quasiClose() throws StorageContainerException { + // The DB must be synced during close operation + flushAndSyncDB(); + writeLock(); try { + // Second sync should be a very light operation as sync has already + // been done outside the lock. + flushAndSyncDB(); updateContainerData(containerData::quasiCloseContainer); } finally { writeUnlock(); @@ -308,16 +316,18 @@ public class KeyValueContainer implements Container { @Override public void close() throws StorageContainerException { + // The DB must be synced during close operation + flushAndSyncDB(); + writeLock(); try { + // Second sync should be a very light operation as sync has already + // been done outside the lock. + flushAndSyncDB(); updateContainerData(containerData::closeContainer); } finally { writeUnlock(); } - - // It is ok if this operation takes a bit of time. - // Close container is not expected to be instantaneous. - compactDB(); } /** @@ -365,6 +375,22 @@ public class KeyValueContainer implements Container { } } + private void flushAndSyncDB() throws StorageContainerException { + try { + try (ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) { + db.getStore().flushDB(true); + LOG.info("Container {} is synced with bcsId {}.", + containerData.getContainerID(), + containerData.getBlockCommitSequenceId()); + } + } catch (StorageContainerException ex) { + throw ex; + } catch (IOException ex) { + LOG.error("Error in DB sync while closing container", ex); + throw new StorageContainerException(ex, ERROR_IN_DB_SYNC); + } + } + @Override public KeyValueContainerData getContainerData() { return containerData; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java index e11bca582d0..c3e67c7ae6b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java @@ -155,6 +155,9 @@ public class TestKeyValueContainerMarkUnhealthy { */ @Test public void testMarkQuasiClosedContainerAsUnhealthy() throws IOException { + // We need to create the container so the sync-on-quasi-close operation + // does not NPE. + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); keyValueContainer.quasiClose(); keyValueContainer.markContainerUnhealthy(); assertThat(keyValueContainerData.getState(), is(UNHEALTHY));