diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index d13aebae3e7..8059b5eee9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -221,6 +221,16 @@ public final class OzoneConfigKeys { = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT; + public static final String DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY; + public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT; + public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY; + public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT; + public static final int DFS_CONTAINER_CHUNK_MAX_SIZE + = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = "dfs.container.ratis.datanode.storage.dir"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 8ea7a23b1a8..3cc46979b83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -62,6 +62,8 @@ public final class OzoneConsts { public static final String CONTAINER_PREFIX = "containers"; public static final String CONTAINER_META_PATH = "metadata"; public static final String CONTAINER_DATA_PATH = "data"; + public static final String CONTAINER_TEMPORARY_CHUNK_PREFIX = "tmp"; + public static final String CONTAINER_CHUNK_NAME_DELIMITER = "."; public static final String CONTAINER_ROOT_PREFIX = "repository"; public static final String FILE_HASH = "SHA-256"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index b79f72b6e84..b41db77492e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -50,6 +50,14 @@ public final class ScmConfigKeys { = "dfs.container.ratis.rpc.type"; public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT = "GRPC"; + public static final String DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY + = "dfs.container.ratis.num.write.chunk.threads"; + public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT + = 60; + public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = + "dfs.container.ratis.segment.size"; + public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = + 128 * 1024 * 1024; // TODO : this is copied from OzoneConsts, may need to move to a better place public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; @@ -57,11 +65,6 @@ public final class ScmConfigKeys { public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024; public static final int OZONE_SCM_CHUNK_MAX_SIZE = 32 * 1024 * 1024; - public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_KEY = - "ozone.scm.ratis.segment.size"; - public static final int OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT = - 128 * 1024 * 1024; - public static final String OZONE_SCM_CLIENT_PORT_KEY = "ozone.scm.client.port"; public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto index 00a8bf916a1..089020193f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/DatanodeContainerProtocol.proto @@ -325,11 +325,18 @@ message ChunkInfo { repeated KeyValue metadata = 5; } +enum Stage { + WRITE_DATA = 1; + COMMIT_DATA = 2; + COMBINED = 3; +} + message WriteChunkRequestProto { required Pipeline pipeline = 1; required string keyName = 2; required ChunkInfo chunkData = 3; - required bytes data = 4; + optional bytes data = 4; + optional Stage stage = 5 [default = COMBINED]; } message WriteChunkResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java index b4ceb979f4d..4aa667ec268 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java @@ -19,6 +19,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -30,7 +32,10 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.security.NoSuchAlgorithmException; import java.util.concurrent.ExecutionException; @@ -67,7 +72,7 @@ public ChunkManagerImpl(ContainerManager manager) { */ @Override public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, - byte[] data) + byte[] data, ContainerProtos.Stage stage) throws StorageContainerException { // we don't want container manager to go away while we are writing chunks. containerManager.readLock(); @@ -81,13 +86,23 @@ public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, ContainerData container = containerManager.readContainer(containerName); File chunkFile = ChunkUtils.validateChunk(pipeline, container, info); - long oldSize = chunkFile.length(); - ChunkUtils.writeData(chunkFile, info, data); - containerManager.incrWriteBytes(containerName, info.getLen()); - containerManager.incrWriteCount(containerName); - long newSize = chunkFile.length(); - containerManager.incrBytesUsed(containerName, newSize - oldSize); - } catch (ExecutionException | NoSuchAlgorithmException e) { + File tmpChunkFile = getTmpChunkFile(chunkFile, info); + + LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file", + info.getChunkName(), stage, chunkFile, tmpChunkFile); + switch (stage) { + case WRITE_DATA: + ChunkUtils.writeData(tmpChunkFile, info, data); + break; + case COMMIT_DATA: + commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen()); + break; + case COMBINED: + ChunkUtils.writeData(tmpChunkFile, info, data); + commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen()); + break; + } + } catch (ExecutionException | NoSuchAlgorithmException | IOException e) { LOG.error("write data failed. error: {}", e); throw new StorageContainerException("Internal error: ", e, CONTAINER_INTERNAL_ERROR); @@ -101,6 +116,29 @@ public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, } } + // Create a temporary file in the same container directory + // in the format ".tmp" + private static File getTmpChunkFile(File chunkFile, ChunkInfo info) + throws StorageContainerException { + return new File(chunkFile.getParent(), + chunkFile.getName() + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); + } + + // Commit the chunk by renaming the temporary chunk file to chunk file + private void commitChunk(File tmpChunkFile, File chunkFile, + String containerName, long chunkLen) throws IOException { + long sizeDiff = tmpChunkFile.length() - chunkFile.length(); + // It is safe to replace here as the earlier chunk if existing should be + // caught as part of validateChunk + Files.move(tmpChunkFile.toPath(), chunkFile.toPath(), + StandardCopyOption.REPLACE_EXISTING); + containerManager.incrBytesUsed(containerName, sizeDiff); + containerManager.incrWriteCount(containerName); + containerManager.incrWriteBytes(containerName, chunkLen); + } + /** * reads the data defined by a chunk. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index 900c4088531..40bc7ba4967 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -456,10 +456,17 @@ private ContainerCommandResponseProto handleWriteChunk( ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk() .getChunkData()); Preconditions.checkNotNull(chunkInfo); - byte[] data = msg.getWriteChunk().getData().toByteArray(); - metrics.incContainerBytesStats(Type.WriteChunk, data.length); - this.containerManager.getChunkManager().writeChunk(pipeline, keyName, - chunkInfo, data); + byte[] data = null; + if (msg.getWriteChunk().getStage() == ContainerProtos.Stage.WRITE_DATA + || msg.getWriteChunk().getStage() == ContainerProtos.Stage.COMBINED) { + data = msg.getWriteChunk().getData().toByteArray(); + metrics.incContainerBytesStats(Type.WriteChunk, data.length); + + } + this.containerManager.getChunkManager() + .writeChunk(pipeline, keyName, chunkInfo, + data, msg.getWriteChunk().getStage()); + return ChunkUtils.getChunkResponse(msg); } @@ -637,7 +644,7 @@ private ContainerCommandResponseProto handlePutSmallFile( metrics.incContainerBytesStats(Type.PutSmallFile, data.length); this.containerManager.getChunkManager().writeChunk(pipeline, keyData - .getKeyName(), chunkInfo, data); + .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED); List chunks = new LinkedList<>(); chunks.add(chunkInfo.getProtoBufMessage()); keyData.setChunks(chunks); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java index 8e309d97a5d..c933924df2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.interfaces; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.scm.container.common.helpers.Pipeline; @@ -33,10 +34,12 @@ public interface ChunkManager { * @param pipeline - Name and the set of machines that make this container. * @param keyName - Name of the Key. * @param info - ChunkInfo. + * @param stage - Chunk Stage write. * @throws StorageContainerException */ void writeChunk(Pipeline pipeline, String keyName, - ChunkInfo info, byte[] data) throws StorageContainerException; + ChunkInfo info, byte[] data, ContainerProtos.Stage stage) + throws StorageContainerException; /** * reads the data defined by a chunk. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index eb9247fe09b..a4517b3b981 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -18,9 +18,10 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; -import com.google.protobuf.ByteString; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; @@ -29,6 +30,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.BaseStateMachine; import org.apache.ratis.statemachine.SimpleStateMachineStorage; @@ -36,10 +38,15 @@ import org.apache.ratis.statemachine.TransactionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; /** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */ public class ContainerStateMachine extends BaseStateMachine { @@ -48,9 +55,19 @@ public class ContainerStateMachine extends BaseStateMachine { private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; + private final ThreadPoolExecutor writeChunkExecutor; + private final ConcurrentHashMap> + writeChunkMap; - ContainerStateMachine(ContainerDispatcher dispatcher) { + ContainerStateMachine(ContainerDispatcher dispatcher, + int numWriteChunkThreads) { this.dispatcher = dispatcher; + writeChunkMap = new ConcurrentHashMap<>(); + writeChunkExecutor = + new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, + 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); } @Override @@ -64,47 +81,132 @@ public void initialize( throws IOException { super.initialize(id, properties, raftStorage); storage.init(raftStorage); + writeChunkExecutor.prestartAllCoreThreads(); // TODO handle snapshots // TODO: Add a flag that tells you that initialize has been called. // Check with Ratis if this feature is done in Ratis. } + public TransactionContext startTransaction(RaftClientRequest request) + throws IOException { + final ContainerCommandRequestProto proto = + getRequestProto(request.getMessage().getContent()); + + final SMLogEntryProto log; + if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) { + final WriteChunkRequestProto write = proto.getWriteChunk(); + // create the state machine data proto + final WriteChunkRequestProto dataWriteChunkProto = + WriteChunkRequestProto + .newBuilder(write) + .setStage(ContainerProtos.Stage.WRITE_DATA) + .build(); + ContainerCommandRequestProto dataContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(dataWriteChunkProto) + .build(); + + // create the log entry proto + final WriteChunkRequestProto commitWriteChunkProto = + WriteChunkRequestProto + .newBuilder(write) + .setStage(ContainerProtos.Stage.COMMIT_DATA) + .build(); + ContainerCommandRequestProto commitContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(commitWriteChunkProto) + .build(); + + log = SMLogEntryProto.newBuilder() + .setData(getShadedByteString(commitContainerCommandProto)) + .setStateMachineData(getShadedByteString(dataContainerCommandProto)) + .build(); + } else { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .build(); + } + return new TransactionContext(this, request, log); + } + + private ByteString getShadedByteString(ContainerCommandRequestProto proto) { + return ShadedProtoUtil.asShadedByteString(proto.toByteArray()); + } + + private ContainerCommandRequestProto getRequestProto(ByteString request) + throws InvalidProtocolBufferException { + return ContainerCommandRequestProto.parseFrom( + ShadedProtoUtil.asByteString(request)); + } + + private Message runCommand(ContainerCommandRequestProto requestProto) { + LOG.trace("dispatch {}", requestProto); + ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); + LOG.trace("response {}", response); + return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()); + } + + @Override + public CompletableFuture writeStateMachineData(LogEntryProto entry) { + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(entry.getSmLogEntry().getStateMachineData()); + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + Message raftClientReply = runCommand(requestProto); + CompletableFuture future = + CompletableFuture.completedFuture(raftClientReply); + writeChunkMap.put(write.getChunkData().getChunkName(),future); + return future; + } catch (IOException e) { + return completeExceptionally(e); + } + } + @Override public CompletableFuture query(RaftClientRequest request) { - return dispatch(ShadedProtoUtil.asByteString( - request.getMessage().getContent()), - response -> new RaftClientReply(request, - () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()))); + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(request.getMessage().getContent()); + RaftClientReply raftClientReply = + new RaftClientReply(request, runCommand(requestProto)); + return CompletableFuture.completedFuture(raftClientReply); + } catch (IOException e) { + return completeExceptionally(e); + } } @Override public CompletableFuture applyTransaction(TransactionContext trx) { - final SMLogEntryProto logEntry = trx.getSMLogEntry(); - return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()), - response -> - () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()) - ); - } - - private CompletableFuture dispatch( - ByteString requestBytes, Function f) { - final ContainerCommandResponseProto response; try { - final ContainerCommandRequestProto request - = ContainerCommandRequestProto.parseFrom(requestBytes); - LOG.trace("dispatch {}", request); - response = dispatcher.dispatch(request); - LOG.trace("response {}", response); + ContainerCommandRequestProto requestProto = + getRequestProto(trx.getSMLogEntry().getData()); + + if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { + WriteChunkRequestProto write = requestProto.getWriteChunk(); + CompletableFuture stateMachineFuture = + writeChunkMap.remove(write.getChunkData().getChunkName()); + return stateMachineFuture + .thenComposeAsync(v -> + CompletableFuture.completedFuture(runCommand(requestProto))); + } else { + return CompletableFuture.completedFuture(runCommand(requestProto)); + } } catch (IOException e) { return completeExceptionally(e); } - return CompletableFuture.completedFuture(f.apply(response)); } - static CompletableFuture completeExceptionally(Exception e) { + private static CompletableFuture completeExceptionally(Exception e) { final CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e); return future; } + + @Override + public void close() throws IOException { + writeChunkExecutor.shutdown(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 179c2a2f4d1..7baca257e5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -59,18 +59,31 @@ public final class XceiverServerRatis implements XceiverServerSpi { private final RaftServer server; private XceiverServerRatis(DatanodeID id, int port, String storageDir, - ContainerDispatcher dispatcher, RpcType rpcType, int maxChunkSize, - int raftSegmentSize) throws IOException { + ContainerDispatcher dispatcher, Configuration conf) throws IOException { + + final String rpcType = conf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); + final int raftSegmentSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); + final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; + final int numWriteChunkThreads = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); + Objects.requireNonNull(id, "id == null"); this.port = port; - RaftProperties serverProperties = newRaftProperties(rpcType, port, + RaftProperties serverProperties = newRaftProperties(rpc, port, storageDir, maxChunkSize, raftSegmentSize); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(id)) .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(serverProperties) - .setStateMachine(new ContainerStateMachine(dispatcher)) + .setStateMachine(new ContainerStateMachine(dispatcher, + numWriteChunkThreads)) .build(); } @@ -126,14 +139,6 @@ public static XceiverServerRatis newXceiverServerRatis(DatanodeID datanodeID, "storage under {}. It is a good idea to map this to an SSD disk.", storageDir); } - final String rpcType = ozoneConf.get( - OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); - final int raftSegmentSize = - ozoneConf.getInt(ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY, - ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT); - final int maxChunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; // Get an available port on current node and // use that as the container port @@ -159,7 +164,7 @@ public static XceiverServerRatis newXceiverServerRatis(DatanodeID datanodeID, } datanodeID.setRatisPort(localPort); return new XceiverServerRatis(datanodeID, localPort, storageDir, - dispatcher, rpc, maxChunkSize, raftSegmentSize); + dispatcher, ozoneConf); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index f114fa2bd60..1ec754d7cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -74,6 +74,8 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper .setDataChecksum; +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .Stage.COMBINED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -380,7 +382,7 @@ private ChunkInfo writeChunkHelper(String containerName, String keyName, ChunkInfo info = getChunk(keyName, 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); return info; } @@ -427,7 +429,7 @@ public void testWritReadManyChunks() throws IOException, ChunkInfo info = getChunk(keyName, x, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); String fileName = String.format("%s.data.%d", keyName, x); fileHashMap.put(fileName, info); } @@ -490,7 +492,7 @@ public void testPartialRead() throws Exception { ChunkInfo info = getChunk(keyName, 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); byte[] readData = chunkManager.readChunk(pipeline, keyName, info); assertTrue(Arrays.equals(data, readData)); @@ -525,9 +527,9 @@ public void testOverWrite() throws IOException, ChunkInfo info = getChunk(keyName, 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); try { - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); } catch (IOException ex) { Assert.assertTrue(ex.getMessage().contains( "Rejecting write chunk request. OverWrite flag required.")); @@ -535,7 +537,7 @@ public void testOverWrite() throws IOException, // With the overwrite flag it should work now. info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); long bytesUsed = containerManager.getBytesUsed(containerName); Assert.assertEquals(datalen, bytesUsed); @@ -573,7 +575,7 @@ public void testMultipleWriteSingleRead() throws IOException, byte[] data = getData(datalen); oldSha.update(data); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); } // Request to read the whole data in a single go. @@ -607,7 +609,7 @@ public void testDeleteChunk() throws IOException, ChunkInfo info = getChunk(keyName, 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); chunkManager.deleteChunk(pipeline, keyName, info); exception.expect(StorageContainerException.class); exception.expectMessage("Unable to find the chunk file."); @@ -661,7 +663,7 @@ public void testPutKeyWithLotsOfChunks() throws IOException, info = getChunk(keyName, x, x * datalen, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(pipeline, keyName, info, data); + chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED); totalSize += datalen * (x + 1); chunkList.add(info); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index b63f54ef7a7..9a1585b0add 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -71,7 +71,8 @@ public void testContainerMetrics() throws Exception { ChunkManager chunkManager = mock(ChunkManager.class); Mockito.doNothing().when(chunkManager).writeChunk( Mockito.any(Pipeline.class), Mockito.anyString(), - Mockito.any(ChunkInfo.class), Mockito.any(byte[].class)); + Mockito.any(ChunkInfo.class), Mockito.any(byte[].class), + Mockito.any(ContainerProtos.Stage.class)); Mockito.doReturn(chunkManager).when(containerManager).getChunkManager(); Mockito.doReturn(true).when(containerManager).isOpen(containerName);