HDDS-882. Provide a config to optionally turn on/off the sync flag during chunk writes. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
5a3c7714c4
commit
8f3e12ff07
|
@ -52,6 +52,9 @@ public final class OzoneConfigKeys {
|
||||||
public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
|
public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
|
||||||
false;
|
false;
|
||||||
|
|
||||||
|
public static final String DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY =
|
||||||
|
"dfs.container.chunk.write.sync";
|
||||||
|
public static final boolean DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT = true;
|
||||||
/**
|
/**
|
||||||
* Ratis Port where containers listen to.
|
* Ratis Port where containers listen to.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -52,6 +52,14 @@
|
||||||
running unit tests.
|
running unit tests.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>dfs.container.chunk.write.sync</name>
|
||||||
|
<value>true</value>
|
||||||
|
<tag>OZONE, CONTAINER, MANAGEMENT</tag>
|
||||||
|
<description>Determines whether the chunk writes in the container happen as
|
||||||
|
sync I/0 or buffered I/O operation.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.container.ratis.statemachinedata.sync.timeout</name>
|
<name>dfs.container.ratis.statemachinedata.sync.timeout</name>
|
||||||
<value>10s</value>
|
<value>10s</value>
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
|
||||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
||||||
|
@ -111,13 +112,17 @@ public class KeyValueHandler extends Handler {
|
||||||
private final VolumeChoosingPolicy volumeChoosingPolicy;
|
private final VolumeChoosingPolicy volumeChoosingPolicy;
|
||||||
private final long maxContainerSize;
|
private final long maxContainerSize;
|
||||||
private final AutoCloseableLock handlerLock;
|
private final AutoCloseableLock handlerLock;
|
||||||
|
private final boolean doSyncWrite;
|
||||||
|
|
||||||
public KeyValueHandler(Configuration config, StateContext context,
|
public KeyValueHandler(Configuration config, StateContext context,
|
||||||
ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
|
ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
|
||||||
super(config, context, contSet, volSet, metrics);
|
super(config, context, contSet, volSet, metrics);
|
||||||
containerType = ContainerType.KeyValueContainer;
|
containerType = ContainerType.KeyValueContainer;
|
||||||
blockManager = new BlockManagerImpl(config);
|
blockManager = new BlockManagerImpl(config);
|
||||||
chunkManager = new ChunkManagerImpl();
|
doSyncWrite =
|
||||||
|
conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY,
|
||||||
|
OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT);
|
||||||
|
chunkManager = new ChunkManagerImpl(doSyncWrite);
|
||||||
long svcInterval = config
|
long svcInterval = config
|
||||||
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
|
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
|
||||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
|
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
|
||||||
|
|
|
@ -67,10 +67,11 @@ public final class ChunkUtils {
|
||||||
* @param chunkInfo - Data stream to write.
|
* @param chunkInfo - Data stream to write.
|
||||||
* @param data - The data buffer.
|
* @param data - The data buffer.
|
||||||
* @param volumeIOStats
|
* @param volumeIOStats
|
||||||
|
* @param sync whether to do fsync or not
|
||||||
* @throws StorageContainerException
|
* @throws StorageContainerException
|
||||||
*/
|
*/
|
||||||
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
|
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
|
||||||
ByteBuffer data, VolumeIOStats volumeIOStats)
|
ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync)
|
||||||
throws StorageContainerException, ExecutionException,
|
throws StorageContainerException, ExecutionException,
|
||||||
InterruptedException, NoSuchAlgorithmException {
|
InterruptedException, NoSuchAlgorithmException {
|
||||||
int bufferSize = data.capacity();
|
int bufferSize = data.capacity();
|
||||||
|
@ -88,12 +89,16 @@ public final class ChunkUtils {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
long writeTimeStart = Time.monotonicNow();
|
long writeTimeStart = Time.monotonicNow();
|
||||||
file =
|
file = sync ?
|
||||||
AsynchronousFileChannel.open(chunkFile.toPath(),
|
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||||
StandardOpenOption.CREATE,
|
StandardOpenOption.CREATE,
|
||||||
StandardOpenOption.WRITE,
|
StandardOpenOption.WRITE,
|
||||||
StandardOpenOption.SPARSE,
|
StandardOpenOption.SPARSE,
|
||||||
StandardOpenOption.SYNC);
|
StandardOpenOption.SYNC) :
|
||||||
|
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||||
|
StandardOpenOption.CREATE,
|
||||||
|
StandardOpenOption.WRITE,
|
||||||
|
StandardOpenOption.SPARSE);
|
||||||
lock = file.lock().get();
|
lock = file.lock().get();
|
||||||
int size = file.write(data, chunkInfo.getOffset()).get();
|
int size = file.write(data, chunkInfo.getOffset()).get();
|
||||||
// Increment volumeIO stats here.
|
// Increment volumeIO stats here.
|
||||||
|
|
|
@ -54,6 +54,11 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
|
||||||
*/
|
*/
|
||||||
public class ChunkManagerImpl implements ChunkManager {
|
public class ChunkManagerImpl implements ChunkManager {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
private final boolean doSyncWrite;
|
||||||
|
|
||||||
|
public ChunkManagerImpl(boolean sync) {
|
||||||
|
doSyncWrite = sync;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* writes a given chunk.
|
* writes a given chunk.
|
||||||
|
@ -115,7 +120,8 @@ public class ChunkManagerImpl implements ChunkManager {
|
||||||
"tmpChunkFile already exists" + tmpChunkFile + "Overwriting it.");
|
"tmpChunkFile already exists" + tmpChunkFile + "Overwriting it.");
|
||||||
}
|
}
|
||||||
// Initially writes to temporary chunk file.
|
// Initially writes to temporary chunk file.
|
||||||
ChunkUtils.writeData(tmpChunkFile, info, data, volumeIOStats);
|
ChunkUtils
|
||||||
|
.writeData(tmpChunkFile, info, data, volumeIOStats, doSyncWrite);
|
||||||
// No need to increment container stats here, as still data is not
|
// No need to increment container stats here, as still data is not
|
||||||
// committed here.
|
// committed here.
|
||||||
break;
|
break;
|
||||||
|
@ -139,7 +145,7 @@ public class ChunkManagerImpl implements ChunkManager {
|
||||||
break;
|
break;
|
||||||
case COMBINED:
|
case COMBINED:
|
||||||
// directly write to the chunk file
|
// directly write to the chunk file
|
||||||
ChunkUtils.writeData(chunkFile, info, data, volumeIOStats);
|
ChunkUtils.writeData(chunkFile, info, data, volumeIOStats, doSyncWrite);
|
||||||
if (!isOverwrite) {
|
if (!isOverwrite) {
|
||||||
containerData.incrBytesUsed(info.getLen());
|
containerData.incrBytesUsed(info.getLen());
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class TestChunkManagerImpl {
|
||||||
.getLocalID(), 0), 0, data.length);
|
.getLocalID(), 0), 0, data.length);
|
||||||
|
|
||||||
// Create a ChunkManager object.
|
// Create a ChunkManager object.
|
||||||
chunkManager = new ChunkManagerImpl();
|
chunkManager = new ChunkManagerImpl(true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class TestContainerPersistence {
|
||||||
containerSet = new ContainerSet();
|
containerSet = new ContainerSet();
|
||||||
volumeSet = new VolumeSet(DATANODE_UUID, conf);
|
volumeSet = new VolumeSet(DATANODE_UUID, conf);
|
||||||
blockManager = new BlockManagerImpl(conf);
|
blockManager = new BlockManagerImpl(conf);
|
||||||
chunkManager = new ChunkManagerImpl();
|
chunkManager = new ChunkManagerImpl(true);
|
||||||
|
|
||||||
for (String dir : conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)) {
|
for (String dir : conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)) {
|
||||||
StorageLocation location = StorageLocation.parse(dir);
|
StorageLocation location = StorageLocation.parse(dir);
|
||||||
|
|
Loading…
Reference in New Issue