HDFS-12853. Ozone: Optimize chunk writes for Ratis by avoiding double writes. Contributed by Mukul Kumar Singh

This commit is contained in:
Tsz-Wo Nicholas Sze 2018-01-09 12:31:41 +08:00 committed by Owen O'Malley
parent 5cc5149bd8
commit 3965f1ec99
11 changed files with 247 additions and 67 deletions

View File

@ -221,6 +221,16 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY; = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT; = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
public static final String DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY;
public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT;
public static final int DFS_CONTAINER_CHUNK_MAX_SIZE
= ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir"; "dfs.container.ratis.datanode.storage.dir";

View File

@ -62,6 +62,8 @@ public final class OzoneConsts {
public static final String CONTAINER_PREFIX = "containers"; public static final String CONTAINER_PREFIX = "containers";
public static final String CONTAINER_META_PATH = "metadata"; public static final String CONTAINER_META_PATH = "metadata";
public static final String CONTAINER_DATA_PATH = "data"; public static final String CONTAINER_DATA_PATH = "data";
public static final String CONTAINER_TEMPORARY_CHUNK_PREFIX = "tmp";
public static final String CONTAINER_CHUNK_NAME_DELIMITER = ".";
public static final String CONTAINER_ROOT_PREFIX = "repository"; public static final String CONTAINER_ROOT_PREFIX = "repository";
public static final String FILE_HASH = "SHA-256"; public static final String FILE_HASH = "SHA-256";

View File

@ -50,6 +50,14 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.rpc.type"; = "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= "GRPC"; = "GRPC";
public static final String DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY
= "dfs.container.ratis.num.write.chunk.threads";
public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
= 60;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
128 * 1024 * 1024;
// TODO : this is copied from OzoneConsts, may need to move to a better place // TODO : this is copied from OzoneConsts, may need to move to a better place
public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
@ -57,11 +65,6 @@ public final class ScmConfigKeys {
public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024; public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024;
public static final int OZONE_SCM_CHUNK_MAX_SIZE = 32 * 1024 * 1024; public static final int OZONE_SCM_CHUNK_MAX_SIZE = 32 * 1024 * 1024;
public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_KEY =
"ozone.scm.ratis.segment.size";
public static final int OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT =
128 * 1024 * 1024;
public static final String OZONE_SCM_CLIENT_PORT_KEY = public static final String OZONE_SCM_CLIENT_PORT_KEY =
"ozone.scm.client.port"; "ozone.scm.client.port";
public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860; public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860;

View File

@ -325,11 +325,18 @@ message ChunkInfo {
repeated KeyValue metadata = 5; repeated KeyValue metadata = 5;
} }
enum Stage {
WRITE_DATA = 1;
COMMIT_DATA = 2;
COMBINED = 3;
}
message WriteChunkRequestProto { message WriteChunkRequestProto {
required Pipeline pipeline = 1; required Pipeline pipeline = 1;
required string keyName = 2; required string keyName = 2;
required ChunkInfo chunkData = 3; required ChunkInfo chunkData = 3;
required bytes data = 4; optional bytes data = 4;
optional Stage stage = 5 [default = COMBINED];
} }
message WriteChunkResponseProto { message WriteChunkResponseProto {

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@ -30,7 +32,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -67,7 +72,7 @@ public class ChunkManagerImpl implements ChunkManager {
*/ */
@Override @Override
public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info, public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
byte[] data) byte[] data, ContainerProtos.Stage stage)
throws StorageContainerException { throws StorageContainerException {
// we don't want container manager to go away while we are writing chunks. // we don't want container manager to go away while we are writing chunks.
containerManager.readLock(); containerManager.readLock();
@ -81,13 +86,23 @@ public class ChunkManagerImpl implements ChunkManager {
ContainerData container = ContainerData container =
containerManager.readContainer(containerName); containerManager.readContainer(containerName);
File chunkFile = ChunkUtils.validateChunk(pipeline, container, info); File chunkFile = ChunkUtils.validateChunk(pipeline, container, info);
long oldSize = chunkFile.length(); File tmpChunkFile = getTmpChunkFile(chunkFile, info);
ChunkUtils.writeData(chunkFile, info, data);
containerManager.incrWriteBytes(containerName, info.getLen()); LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
containerManager.incrWriteCount(containerName); info.getChunkName(), stage, chunkFile, tmpChunkFile);
long newSize = chunkFile.length(); switch (stage) {
containerManager.incrBytesUsed(containerName, newSize - oldSize); case WRITE_DATA:
} catch (ExecutionException | NoSuchAlgorithmException e) { ChunkUtils.writeData(tmpChunkFile, info, data);
break;
case COMMIT_DATA:
commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen());
break;
case COMBINED:
ChunkUtils.writeData(tmpChunkFile, info, data);
commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen());
break;
}
} catch (ExecutionException | NoSuchAlgorithmException | IOException e) {
LOG.error("write data failed. error: {}", e); LOG.error("write data failed. error: {}", e);
throw new StorageContainerException("Internal error: ", e, throw new StorageContainerException("Internal error: ", e,
CONTAINER_INTERNAL_ERROR); CONTAINER_INTERNAL_ERROR);
@ -101,6 +116,29 @@ public class ChunkManagerImpl implements ChunkManager {
} }
} }
// Create a temporary file in the same container directory
// in the format "<chunkname>.tmp"
private static File getTmpChunkFile(File chunkFile, ChunkInfo info)
throws StorageContainerException {
return new File(chunkFile.getParent(),
chunkFile.getName() +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
}
// Commit the chunk by renaming the temporary chunk file to chunk file
private void commitChunk(File tmpChunkFile, File chunkFile,
String containerName, long chunkLen) throws IOException {
long sizeDiff = tmpChunkFile.length() - chunkFile.length();
// It is safe to replace here as the earlier chunk if existing should be
// caught as part of validateChunk
Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
containerManager.incrBytesUsed(containerName, sizeDiff);
containerManager.incrWriteCount(containerName);
containerManager.incrWriteBytes(containerName, chunkLen);
}
/** /**
* reads the data defined by a chunk. * reads the data defined by a chunk.
* *

View File

@ -456,10 +456,17 @@ public class Dispatcher implements ContainerDispatcher {
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk() ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
.getChunkData()); .getChunkData());
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
byte[] data = msg.getWriteChunk().getData().toByteArray(); byte[] data = null;
if (msg.getWriteChunk().getStage() == ContainerProtos.Stage.WRITE_DATA
|| msg.getWriteChunk().getStage() == ContainerProtos.Stage.COMBINED) {
data = msg.getWriteChunk().getData().toByteArray();
metrics.incContainerBytesStats(Type.WriteChunk, data.length); metrics.incContainerBytesStats(Type.WriteChunk, data.length);
this.containerManager.getChunkManager().writeChunk(pipeline, keyName,
chunkInfo, data); }
this.containerManager.getChunkManager()
.writeChunk(pipeline, keyName, chunkInfo,
data, msg.getWriteChunk().getStage());
return ChunkUtils.getChunkResponse(msg); return ChunkUtils.getChunkResponse(msg);
} }
@ -637,7 +644,7 @@ public class Dispatcher implements ContainerDispatcher {
metrics.incContainerBytesStats(Type.PutSmallFile, data.length); metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
this.containerManager.getChunkManager().writeChunk(pipeline, keyData this.containerManager.getChunkManager().writeChunk(pipeline, keyData
.getKeyName(), chunkInfo, data); .getKeyName(), chunkInfo, data, ContainerProtos.Stage.COMBINED);
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage()); chunks.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunks); keyData.setChunks(chunks);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.interfaces; package org.apache.hadoop.ozone.container.common.interfaces;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -33,10 +34,12 @@ public interface ChunkManager {
* @param pipeline - Name and the set of machines that make this container. * @param pipeline - Name and the set of machines that make this container.
* @param keyName - Name of the Key. * @param keyName - Name of the Key.
* @param info - ChunkInfo. * @param info - ChunkInfo.
* @param stage - Chunk Stage write.
* @throws StorageContainerException * @throws StorageContainerException
*/ */
void writeChunk(Pipeline pipeline, String keyName, void writeChunk(Pipeline pipeline, String keyName,
ChunkInfo info, byte[] data) throws StorageContainerException; ChunkInfo info, byte[] data, ContainerProtos.Stage stage)
throws StorageContainerException;
/** /**
* reads the data defined by a chunk. * reads the data defined by a chunk.

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis; package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.protobuf.ByteString; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
@ -29,6 +30,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.BaseStateMachine; import org.apache.ratis.statemachine.BaseStateMachine;
import org.apache.ratis.statemachine.SimpleStateMachineStorage; import org.apache.ratis.statemachine.SimpleStateMachineStorage;
@ -36,10 +38,15 @@ import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.TransactionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Function; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */ /** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */
public class ContainerStateMachine extends BaseStateMachine { public class ContainerStateMachine extends BaseStateMachine {
@ -48,9 +55,19 @@ public class ContainerStateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage(); = new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher; private final ContainerDispatcher dispatcher;
private final ThreadPoolExecutor writeChunkExecutor;
private final ConcurrentHashMap<String, CompletableFuture<Message>>
writeChunkMap;
ContainerStateMachine(ContainerDispatcher dispatcher) { ContainerStateMachine(ContainerDispatcher dispatcher,
int numWriteChunkThreads) {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
writeChunkMap = new ConcurrentHashMap<>();
writeChunkExecutor =
new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
@Override @Override
@ -64,47 +81,132 @@ public class ContainerStateMachine extends BaseStateMachine {
throws IOException { throws IOException {
super.initialize(id, properties, raftStorage); super.initialize(id, properties, raftStorage);
storage.init(raftStorage); storage.init(raftStorage);
writeChunkExecutor.prestartAllCoreThreads();
// TODO handle snapshots // TODO handle snapshots
// TODO: Add a flag that tells you that initialize has been called. // TODO: Add a flag that tells you that initialize has been called.
// Check with Ratis if this feature is done in Ratis. // Check with Ratis if this feature is done in Ratis.
} }
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
final ContainerCommandRequestProto proto =
getRequestProto(request.getMessage().getContent());
final SMLogEntryProto log;
if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
// create the state machine data proto
final WriteChunkRequestProto dataWriteChunkProto =
WriteChunkRequestProto
.newBuilder(write)
.setStage(ContainerProtos.Stage.WRITE_DATA)
.build();
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto
.newBuilder(proto)
.setWriteChunk(dataWriteChunkProto)
.build();
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto
.newBuilder(write)
.setStage(ContainerProtos.Stage.COMMIT_DATA)
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
.newBuilder(proto)
.setWriteChunk(commitWriteChunkProto)
.build();
log = SMLogEntryProto.newBuilder()
.setData(getShadedByteString(commitContainerCommandProto))
.setStateMachineData(getShadedByteString(dataContainerCommandProto))
.build();
} else {
log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent())
.build();
}
return new TransactionContext(this, request, log);
}
private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
return ShadedProtoUtil.asShadedByteString(proto.toByteArray());
}
private ContainerCommandRequestProto getRequestProto(ByteString request)
throws InvalidProtocolBufferException {
return ContainerCommandRequestProto.parseFrom(
ShadedProtoUtil.asByteString(request));
}
private Message runCommand(ContainerCommandRequestProto requestProto) {
LOG.trace("dispatch {}", requestProto);
ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
LOG.trace("response {}", response);
return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
}
@Override
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
final WriteChunkRequestProto write = requestProto.getWriteChunk();
Message raftClientReply = runCommand(requestProto);
CompletableFuture<Message> future =
CompletableFuture.completedFuture(raftClientReply);
writeChunkMap.put(write.getChunkData().getChunkName(),future);
return future;
} catch (IOException e) {
return completeExceptionally(e);
}
}
@Override @Override
public CompletableFuture<RaftClientReply> query(RaftClientRequest request) { public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
return dispatch(ShadedProtoUtil.asByteString( try {
request.getMessage().getContent()), final ContainerCommandRequestProto requestProto =
response -> new RaftClientReply(request, getRequestProto(request.getMessage().getContent());
() -> ShadedProtoUtil.asShadedByteString(response.toByteArray()))); RaftClientReply raftClientReply =
new RaftClientReply(request, runCommand(requestProto));
return CompletableFuture.completedFuture(raftClientReply);
} catch (IOException e) {
return completeExceptionally(e);
}
} }
@Override @Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) { public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final SMLogEntryProto logEntry = trx.getSMLogEntry();
return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()),
response ->
() -> ShadedProtoUtil.asShadedByteString(response.toByteArray())
);
}
private <T> CompletableFuture<T> dispatch(
ByteString requestBytes, Function<ContainerCommandResponseProto, T> f) {
final ContainerCommandResponseProto response;
try { try {
final ContainerCommandRequestProto request ContainerCommandRequestProto requestProto =
= ContainerCommandRequestProto.parseFrom(requestBytes); getRequestProto(trx.getSMLogEntry().getData());
LOG.trace("dispatch {}", request);
response = dispatcher.dispatch(request); if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
LOG.trace("response {}", response); WriteChunkRequestProto write = requestProto.getWriteChunk();
CompletableFuture<Message> stateMachineFuture =
writeChunkMap.remove(write.getChunkData().getChunkName());
return stateMachineFuture
.thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
return CompletableFuture.completedFuture(runCommand(requestProto));
}
} catch (IOException e) { } catch (IOException e) {
return completeExceptionally(e); return completeExceptionally(e);
} }
return CompletableFuture.completedFuture(f.apply(response));
} }
static <T> CompletableFuture<T> completeExceptionally(Exception e) { private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>(); final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e); future.completeExceptionally(e);
return future; return future;
} }
@Override
public void close() throws IOException {
writeChunkExecutor.shutdown();
}
} }

View File

@ -59,18 +59,31 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private final RaftServer server; private final RaftServer server;
private XceiverServerRatis(DatanodeID id, int port, String storageDir, private XceiverServerRatis(DatanodeID id, int port, String storageDir,
ContainerDispatcher dispatcher, RpcType rpcType, int maxChunkSize, ContainerDispatcher dispatcher, Configuration conf) throws IOException {
int raftSegmentSize) throws IOException {
final String rpcType = conf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
final int raftSegmentSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
final int numWriteChunkThreads = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
Objects.requireNonNull(id, "id == null"); Objects.requireNonNull(id, "id == null");
this.port = port; this.port = port;
RaftProperties serverProperties = newRaftProperties(rpcType, port, RaftProperties serverProperties = newRaftProperties(rpc, port,
storageDir, maxChunkSize, raftSegmentSize); storageDir, maxChunkSize, raftSegmentSize);
this.server = RaftServer.newBuilder() this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(id)) .setServerId(RatisHelper.toRaftPeerId(id))
.setGroup(RatisHelper.emptyRaftGroup()) .setGroup(RatisHelper.emptyRaftGroup())
.setProperties(serverProperties) .setProperties(serverProperties)
.setStateMachine(new ContainerStateMachine(dispatcher)) .setStateMachine(new ContainerStateMachine(dispatcher,
numWriteChunkThreads))
.build(); .build();
} }
@ -126,14 +139,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
"storage under {}. It is a good idea to map this to an SSD disk.", "storage under {}. It is a good idea to map this to an SSD disk.",
storageDir); storageDir);
} }
final String rpcType = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
final int raftSegmentSize =
ozoneConf.getInt(ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY,
ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT);
final int maxChunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
// Get an available port on current node and // Get an available port on current node and
// use that as the container port // use that as the container port
@ -159,7 +164,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
} }
datanodeID.setRatisPort(localPort); datanodeID.setRatisPort(localPort);
return new XceiverServerRatis(datanodeID, localPort, storageDir, return new XceiverServerRatis(datanodeID, localPort, storageDir,
dispatcher, rpc, maxChunkSize, raftSegmentSize); dispatcher, ozoneConf);
} }
@Override @Override

View File

@ -74,6 +74,8 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
import static org.apache.hadoop.ozone.container.ContainerTestHelper import static org.apache.hadoop.ozone.container.ContainerTestHelper
.setDataChecksum; .setDataChecksum;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Stage.COMBINED;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -380,7 +382,7 @@ public class TestContainerPersistence {
ChunkInfo info = getChunk(keyName, 0, 0, datalen); ChunkInfo info = getChunk(keyName, 0, 0, datalen);
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
return info; return info;
} }
@ -427,7 +429,7 @@ public class TestContainerPersistence {
ChunkInfo info = getChunk(keyName, x, 0, datalen); ChunkInfo info = getChunk(keyName, x, 0, datalen);
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
String fileName = String.format("%s.data.%d", keyName, x); String fileName = String.format("%s.data.%d", keyName, x);
fileHashMap.put(fileName, info); fileHashMap.put(fileName, info);
} }
@ -490,7 +492,7 @@ public class TestContainerPersistence {
ChunkInfo info = getChunk(keyName, 0, 0, datalen); ChunkInfo info = getChunk(keyName, 0, 0, datalen);
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
byte[] readData = chunkManager.readChunk(pipeline, keyName, info); byte[] readData = chunkManager.readChunk(pipeline, keyName, info);
assertTrue(Arrays.equals(data, readData)); assertTrue(Arrays.equals(data, readData));
@ -525,9 +527,9 @@ public class TestContainerPersistence {
ChunkInfo info = getChunk(keyName, 0, 0, datalen); ChunkInfo info = getChunk(keyName, 0, 0, datalen);
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
try { try {
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
} catch (IOException ex) { } catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains( Assert.assertTrue(ex.getMessage().contains(
"Rejecting write chunk request. OverWrite flag required.")); "Rejecting write chunk request. OverWrite flag required."));
@ -535,7 +537,7 @@ public class TestContainerPersistence {
// With the overwrite flag it should work now. // With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
long bytesUsed = containerManager.getBytesUsed(containerName); long bytesUsed = containerManager.getBytesUsed(containerName);
Assert.assertEquals(datalen, bytesUsed); Assert.assertEquals(datalen, bytesUsed);
@ -573,7 +575,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
oldSha.update(data); oldSha.update(data);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
} }
// Request to read the whole data in a single go. // Request to read the whole data in a single go.
@ -607,7 +609,7 @@ public class TestContainerPersistence {
ChunkInfo info = getChunk(keyName, 0, 0, datalen); ChunkInfo info = getChunk(keyName, 0, 0, datalen);
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
chunkManager.deleteChunk(pipeline, keyName, info); chunkManager.deleteChunk(pipeline, keyName, info);
exception.expect(StorageContainerException.class); exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the chunk file."); exception.expectMessage("Unable to find the chunk file.");
@ -661,7 +663,7 @@ public class TestContainerPersistence {
info = getChunk(keyName, x, x * datalen, datalen); info = getChunk(keyName, x, x * datalen, datalen);
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(pipeline, keyName, info, data); chunkManager.writeChunk(pipeline, keyName, info, data, COMBINED);
totalSize += datalen * (x + 1); totalSize += datalen * (x + 1);
chunkList.add(info); chunkList.add(info);
} }

View File

@ -71,7 +71,8 @@ public class TestContainerMetrics {
ChunkManager chunkManager = mock(ChunkManager.class); ChunkManager chunkManager = mock(ChunkManager.class);
Mockito.doNothing().when(chunkManager).writeChunk( Mockito.doNothing().when(chunkManager).writeChunk(
Mockito.any(Pipeline.class), Mockito.anyString(), Mockito.any(Pipeline.class), Mockito.anyString(),
Mockito.any(ChunkInfo.class), Mockito.any(byte[].class)); Mockito.any(ChunkInfo.class), Mockito.any(byte[].class),
Mockito.any(ContainerProtos.Stage.class));
Mockito.doReturn(chunkManager).when(containerManager).getChunkManager(); Mockito.doReturn(chunkManager).when(containerManager).getChunkManager();
Mockito.doReturn(true).when(containerManager).isOpen(containerName); Mockito.doReturn(true).when(containerManager).isOpen(containerName);