HDDS-887. Add DispatcherContext info to Dispatcher from containerStateMachine. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2018-12-02 08:00:35 +05:30
parent d15dc43659
commit 5a3c7714c4
20 changed files with 321 additions and 188 deletions

View File

@ -373,17 +373,10 @@ enum ChecksumType {
MD5 = 5; MD5 = 5;
} }
enum Stage {
WRITE_DATA = 1;
COMMIT_DATA = 2;
COMBINED = 3;
}
message WriteChunkRequestProto { message WriteChunkRequestProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2; required ChunkInfo chunkData = 2;
optional bytes data = 3; optional bytes data = 3;
optional Stage stage = 4 [default = COMBINED];
} }
message WriteChunkResponseProto { message WriteChunkResponseProto {
@ -392,7 +385,6 @@ message WriteChunkResponseProto {
message ReadChunkRequestProto { message ReadChunkRequestProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
required ChunkInfo chunkData = 2; required ChunkInfo chunkData = 2;
optional bool readFromTmpFile = 3 [default = false];
} }
message ReadChunkResponseProto { message ReadChunkResponseProto {

View File

@ -47,6 +47,8 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -133,7 +135,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
@Override @Override
public ContainerCommandResponseProto dispatch( public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg) { ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Preconditions.checkNotNull(msg); Preconditions.checkNotNull(msg);
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(), LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
msg.getTraceID()); msg.getTraceID());
@ -194,7 +196,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
audit(action, eventType, params, AuditEventStatus.FAILURE, ex); audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg); return ContainerUtils.logAndReturnError(LOG, ex, msg);
} }
responseProto = handler.handle(msg, container); responseProto = handler.handle(msg, container, dispatcherContext);
if (responseProto != null) { if (responseProto != null) {
metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime); metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime);
@ -269,7 +271,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
// TODO: Assuming the container type to be KeyValueContainer for now. // TODO: Assuming the container type to be KeyValueContainer for now.
// We need to get container type from the containerRequest. // We need to get container type from the containerRequest.
Handler handler = getHandler(containerType); Handler handler = getHandler(containerType);
handler.handle(requestBuilder.build(), null); handler.handle(requestBuilder.build(), null, null);
} }
/** /**

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto; .ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
/** /**
* Dispatcher acts as the bridge between the transport layer and * Dispatcher acts as the bridge between the transport layer and
@ -37,9 +38,11 @@ public interface ContainerDispatcher {
/** /**
* Dispatches commands to container layer. * Dispatches commands to container layer.
* @param msg - Command Request * @param msg - Command Request
* @param context - Context info related to ContainerStateMachine
* @return Command Response * @return Command Response
*/ */
ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg); ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg,
DispatcherContext context);
/** /**
* Validates whether the container command should be executed on the pipeline * Validates whether the container command should be executed on the pipeline

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
@ -103,7 +104,8 @@ public abstract class Handler {
} }
public abstract ContainerCommandResponseProto handle( public abstract ContainerCommandResponseProto handle(
ContainerCommandRequestProto msg, Container container); ContainerCommandRequestProto msg, Container container,
DispatcherContext dispatcherContext);
/** /**
* Import container data from a raw input stream. * Import container data from a raw input stream.

View File

@ -53,7 +53,8 @@ public class GrpcXceiverService extends
@Override @Override
public void onNext(ContainerCommandRequestProto request) { public void onNext(ContainerCommandRequestProto request) {
try { try {
ContainerCommandResponseProto resp = dispatcher.dispatch(request); ContainerCommandResponseProto resp =
dispatcher.dispatch(request, null);
responseObserver.onNext(resp); responseObserver.onNext(resp);
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("{} got exception when processing" LOG.error("{} got exception when processing"

View File

@ -130,7 +130,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
public void submitRequest(ContainerCommandRequestProto request, public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineID) throws IOException {
ContainerProtos.ContainerCommandResponseProto response = ContainerProtos.ContainerCommandResponseProto response =
storageContainer.dispatch(request); storageContainer.dispatch(request, null);
if (response.getResult() != ContainerProtos.Result.SUCCESS) { if (response.getResult() != ContainerProtos.Result.SUCCESS) {
throw new StorageContainerException(response.getMessage(), throw new StorageContainerException(response.getMessage(),
response.getResult()); response.getResult());

View File

@ -22,8 +22,6 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
@ -35,7 +33,6 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException; .InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto; .ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -237,7 +234,6 @@ public class ContainerStateMachine extends BaseStateMachine {
final WriteChunkRequestProto dataWriteChunkProto = final WriteChunkRequestProto dataWriteChunkProto =
WriteChunkRequestProto WriteChunkRequestProto
.newBuilder(write) .newBuilder(write)
.setStage(Stage.WRITE_DATA)
.build(); .build();
ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto ContainerCommandRequestProto
@ -252,7 +248,6 @@ public class ContainerStateMachine extends BaseStateMachine {
.setChunkData(write.getChunkData()) .setChunkData(write.getChunkData())
// skipping the data field as it is // skipping the data field as it is
// already set in statemachine data proto // already set in statemachine data proto
.setStage(Stage.COMMIT_DATA)
.build(); .build();
ContainerCommandRequestProto commitContainerCommandProto = ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto ContainerCommandRequestProto
@ -292,15 +287,18 @@ public class ContainerStateMachine extends BaseStateMachine {
} }
private ContainerCommandResponseProto dispatchCommand( private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto) { ContainerCommandRequestProto requestProto,
DispatcherContext context) {
LOG.trace("dispatch {}", requestProto); LOG.trace("dispatch {}", requestProto);
ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
LOG.trace("response {}", response); LOG.trace("response {}", response);
return response; return response;
} }
private Message runCommand(ContainerCommandRequestProto requestProto) { private Message runCommand(ContainerCommandRequestProto requestProto,
return dispatchCommand(requestProto)::toByteString; DispatcherContext context) {
return dispatchCommand(requestProto, context)::toByteString;
} }
private ExecutorService getCommandExecutor( private ExecutorService getCommandExecutor(
@ -310,7 +308,7 @@ public class ContainerStateMachine extends BaseStateMachine {
} }
private CompletableFuture<Message> handleWriteChunk( private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) { ContainerCommandRequestProto requestProto, long entryIndex, long term) {
final WriteChunkRequestProto write = requestProto.getWriteChunk(); final WriteChunkRequestProto write = requestProto.getWriteChunk();
RaftServer server = ratisServer.getServer(); RaftServer server = ratisServer.getServer();
Preconditions.checkState(server instanceof RaftServerProxy); Preconditions.checkState(server instanceof RaftServerProxy);
@ -321,8 +319,14 @@ public class ContainerStateMachine extends BaseStateMachine {
} catch (IOException ioe) { } catch (IOException ioe) {
return completeExceptionally(ioe); return completeExceptionally(ioe);
} }
DispatcherContext context =
new DispatcherContext.Builder()
.setTerm(term)
.setLogIndex(entryIndex)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.build();
CompletableFuture<Message> writeChunkFuture = CompletableFuture CompletableFuture<Message> writeChunkFuture = CompletableFuture
.supplyAsync(() -> runCommand(requestProto), chunkExecutor); .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture); writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData() + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
@ -355,7 +359,8 @@ public class ContainerStateMachine extends BaseStateMachine {
// CreateContainer will happen as a part of writeChunk only. // CreateContainer will happen as a part of writeChunk only.
switch (cmdType) { switch (cmdType) {
case WriteChunk: case WriteChunk:
return handleWriteChunk(requestProto, entry.getIndex()); return handleWriteChunk(requestProto, entry.getIndex(),
entry.getTerm());
default: default:
throw new IllegalStateException("Cmd Type:" + cmdType throw new IllegalStateException("Cmd Type:" + cmdType
+ " should not have state machine data"); + " should not have state machine data");
@ -372,39 +377,36 @@ public class ContainerStateMachine extends BaseStateMachine {
metrics.incNumReadStateMachineOps(); metrics.incNumReadStateMachineOps();
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getRequestProto(request.getContent()); getRequestProto(request.getContent());
return CompletableFuture.completedFuture(runCommand(requestProto)); return CompletableFuture.completedFuture(runCommand(requestProto, null));
} catch (IOException e) { } catch (IOException e) {
metrics.incNumReadStateMachineFails(); metrics.incNumReadStateMachineFails();
return completeExceptionally(e); return completeExceptionally(e);
} }
} }
private ByteString readStateMachineData(ContainerCommandRequestProto private ByteString readStateMachineData(
requestProto) { ContainerCommandRequestProto requestProto, long term, long index) {
WriteChunkRequestProto writeChunkRequestProto = WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk(); requestProto.getWriteChunk();
// Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
// written through writeStateMachineData.
Preconditions
.checkArgument(writeChunkRequestProto.getStage() == Stage.COMMIT_DATA);
// prepare the chunk to be read // prepare the chunk to be read
ReadChunkRequestProto.Builder readChunkRequestProto = ReadChunkRequestProto.Builder readChunkRequestProto =
ReadChunkRequestProto.newBuilder() ReadChunkRequestProto.newBuilder()
.setBlockID(writeChunkRequestProto.getBlockID()) .setBlockID(writeChunkRequestProto.getBlockID())
.setChunkData(writeChunkRequestProto.getChunkData()) .setChunkData(writeChunkRequestProto.getChunkData());
// set readFromTempFile to true in case, the chunkFile does
// not exist as applyTransaction is not executed for this entry yet.
.setReadFromTmpFile(true);
ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto.newBuilder(requestProto) ContainerCommandRequestProto.newBuilder(requestProto)
.setCmdType(Type.ReadChunk) .setCmdType(Type.ReadChunk)
.setReadChunk(readChunkRequestProto) .setReadChunk(readChunkRequestProto)
.build(); .build();
DispatcherContext context =
new DispatcherContext.Builder()
.setTerm(term)
.setLogIndex(index)
.setReadFromTmpFile(true)
.build();
// read the chunk // read the chunk
ContainerCommandResponseProto response = ContainerCommandResponseProto response =
dispatchCommand(dataContainerCommandProto); dispatchCommand(dataContainerCommandProto, context);
ReadChunkResponseProto responseProto = response.getReadChunk(); ReadChunkResponseProto responseProto = response.getReadChunk();
ByteString data = responseProto.getData(); ByteString data = responseProto.getData();
@ -416,14 +418,14 @@ public class ContainerStateMachine extends BaseStateMachine {
/** /**
* Reads the Entry from the Cache or loads it back by reading from disk. * Reads the Entry from the Cache or loads it back by reading from disk.
*/ */
private ByteString getCachedStateMachineData(Long logIndex, private ByteString getCachedStateMachineData(Long logIndex, long term,
ContainerCommandRequestProto requestProto) throws ExecutionException { ContainerCommandRequestProto requestProto) throws ExecutionException {
try { try {
return reconstructWriteChunkRequest( return reconstructWriteChunkRequest(
stateMachineDataCache.get(logIndex, new Callable<ByteString>() { stateMachineDataCache.get(logIndex, new Callable<ByteString>() {
@Override @Override
public ByteString call() throws Exception { public ByteString call() throws Exception {
return readStateMachineData(requestProto); return readStateMachineData(requestProto, term, logIndex);
} }
}), requestProto); }), requestProto);
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -439,7 +441,7 @@ public class ContainerStateMachine extends BaseStateMachine {
final WriteChunkRequestProto.Builder dataWriteChunkProto = final WriteChunkRequestProto.Builder dataWriteChunkProto =
WriteChunkRequestProto.newBuilder(writeChunkRequestProto) WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
// adding the state machine data // adding the state machine data
.setData(data).setStage(Stage.WRITE_DATA); .setData(data);
ContainerCommandRequestProto.Builder newStateMachineProto = ContainerCommandRequestProto.Builder newStateMachineProto =
ContainerCommandRequestProto.newBuilder(requestProto) ContainerCommandRequestProto.newBuilder(requestProto)
@ -486,7 +488,8 @@ public class ContainerStateMachine extends BaseStateMachine {
CompletableFuture<ByteString> future = new CompletableFuture<>(); CompletableFuture<ByteString> future = new CompletableFuture<>();
return future.supplyAsync(() -> { return future.supplyAsync(() -> {
try { try {
return getCachedStateMachineData(entry.getIndex(), requestProto); return getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
requestProto);
} catch (ExecutionException e) { } catch (ExecutionException e) {
future.completeExceptionally(e); future.completeExceptionally(e);
return null; return null;
@ -524,6 +527,10 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override @Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) { public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
long index = trx.getLogEntry().getIndex(); long index = trx.getLogEntry().getIndex();
DispatcherContext.Builder builder =
new DispatcherContext.Builder()
.setTerm(trx.getLogEntry().getTerm())
.setLogIndex(index);
// ApplyTransaction call can come with an entryIndex much greater than // ApplyTransaction call can come with an entryIndex much greater than
// lastIndex updated because in between entries in the raft log can be // lastIndex updated because in between entries in the raft log can be
@ -539,51 +546,16 @@ public class ContainerStateMachine extends BaseStateMachine {
getRequestProto(trx.getStateMachineLogEntry().getLogData()); getRequestProto(trx.getStateMachineLogEntry().getLogData());
Type cmdType = requestProto.getCmdType(); Type cmdType = requestProto.getCmdType();
CompletableFuture<Message> future; CompletableFuture<Message> future;
if (cmdType == Type.PutBlock || cmdType == Type.PutSmallFile) { // Make sure that in write chunk, the user data is not set
BlockData blockData; if (cmdType == Type.WriteChunk) {
ContainerProtos.BlockData blockDataProto = cmdType == Type.PutBlock ? Preconditions
requestProto.getPutBlock().getBlockData() : .checkArgument(requestProto.getWriteChunk().getData().isEmpty());
requestProto.getPutSmallFile().getBlock().getBlockData(); builder
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
// set the blockCommitSequenceId
try {
blockData = BlockData.getFromProtoBuf(blockDataProto);
} catch (IOException ioe) {
LOG.error("unable to retrieve blockData info for Block {}",
blockDataProto.getBlockID());
return completeExceptionally(ioe);
}
blockData.setBlockCommitSequenceId(index);
final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
ContainerProtos.PutBlockRequestProto
.newBuilder(requestProto.getPutBlock())
.setBlockData(blockData.getProtoBufMessage()).build();
ContainerCommandRequestProto containerCommandRequestProto;
if (cmdType == Type.PutSmallFile) {
ContainerProtos.PutSmallFileRequestProto smallFileRequestProto =
ContainerProtos.PutSmallFileRequestProto
.newBuilder(requestProto.getPutSmallFile())
.setBlock(putBlockRequestProto).build();
containerCommandRequestProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setPutSmallFile(smallFileRequestProto).build();
} else {
containerCommandRequestProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setPutBlock(putBlockRequestProto).build();
}
future = CompletableFuture
.supplyAsync(() -> runCommand(containerCommandRequestProto),
getCommandExecutor(requestProto));
} else {
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {
Preconditions.checkArgument(requestProto
.getWriteChunk().getData().isEmpty());
}
future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
getCommandExecutor(requestProto));
} }
future = CompletableFuture
.supplyAsync(() -> runCommand(requestProto, builder.build()),
getCommandExecutor(requestProto));
lastIndex = index; lastIndex = index;
future.thenAccept(m -> { future.thenAccept(m -> {
final Long previous = final Long previous =

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* DispatcherContext class holds transport protocol specfic context info
* required for execution of container commands over the container dispatcher.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DispatcherContext {
/**
* Determines which stage of writeChunk a write chunk request is for.
*/
public enum WriteChunkStage {
WRITE_DATA, COMMIT_DATA, COMBINED
}
// whether the chunk data needs to be written or committed or both
private final WriteChunkStage stage;
// indicates whether the read from tmp chunk files is allowed
private final boolean readFromTmpFile;
// which term the request is being served in Ratis
private final long term;
// the log index in Ratis log to which the request belongs to
private final long logIndex;
private DispatcherContext(long term, long index, WriteChunkStage stage,
boolean readFromTmpFile) {
this.term = term;
this.logIndex = index;
this.stage = stage;
this.readFromTmpFile = readFromTmpFile;
}
public long getLogIndex() {
return logIndex;
}
public boolean isReadFromTmpFile() {
return readFromTmpFile;
}
public long getTerm() {
return term;
}
public WriteChunkStage getStage() {
return stage;
}
/**
* Builder class for building DispatcherContext.
*/
public static final class Builder {
private WriteChunkStage stage = WriteChunkStage.COMBINED;
private boolean readFromTmpFile = false;
private long term;
private long logIndex;
/**
* Sets the WriteChunkStage.
*
* @param stage WriteChunk Stage
* @return DispatcherContext.Builder
*/
public Builder setStage(WriteChunkStage stage) {
this.stage = stage;
return this;
}
/**
* Sets the flag for reading from tmp chunk files.
*
* @param readFromTmpFile whether to read from tmp chunk file or not
* @return DispatcherContext.Builder
*/
public Builder setReadFromTmpFile(boolean readFromTmpFile) {
this.readFromTmpFile = readFromTmpFile;
return this;
}
/**
* Sets the current term for the container request from Ratis.
*
* @param term current term
* @return DispatcherContext.Builder
*/
public Builder setTerm(long term) {
this.term = term;
return this;
}
/**
* Sets the logIndex for the container request from Ratis.
*
* @param logIndex log index
* @return DispatcherContext.Builder
*/
public Builder setLogIndex(long logIndex) {
this.logIndex = logIndex;
return this;
}
/**
* Builds and returns DatanodeDetails instance.
*
* @return DispatcherContext
*/
public DispatcherContext build() {
return new DispatcherContext(term, logIndex, stage, readFromTmpFile);
}
}
}

View File

@ -58,6 +58,10 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.DispatcherContext.WriteChunkStage;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy; .RoundRobinVolumeChoosingPolicy;
@ -81,8 +85,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import static org.apache.hadoop.hdds.HddsConfigKeys import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY; .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Stage;
import static org.apache.hadoop.ozone.OzoneConfigKeys import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys import static org.apache.hadoop.ozone.OzoneConfigKeys
@ -146,7 +148,8 @@ public class KeyValueHandler extends Handler {
@Override @Override
public ContainerCommandResponseProto handle( public ContainerCommandResponseProto handle(
ContainerCommandRequestProto request, Container container) { ContainerCommandRequestProto request, Container container,
DispatcherContext dispatcherContext) {
Type cmdType = request.getCmdType(); Type cmdType = request.getCmdType();
KeyValueContainer kvContainer = (KeyValueContainer) container; KeyValueContainer kvContainer = (KeyValueContainer) container;
@ -164,7 +167,7 @@ public class KeyValueHandler extends Handler {
case CloseContainer: case CloseContainer:
return handleCloseContainer(request, kvContainer); return handleCloseContainer(request, kvContainer);
case PutBlock: case PutBlock:
return handlePutBlock(request, kvContainer); return handlePutBlock(request, kvContainer, dispatcherContext);
case GetBlock: case GetBlock:
return handleGetBlock(request, kvContainer); return handleGetBlock(request, kvContainer);
case DeleteBlock: case DeleteBlock:
@ -172,17 +175,17 @@ public class KeyValueHandler extends Handler {
case ListBlock: case ListBlock:
return handleUnsupportedOp(request); return handleUnsupportedOp(request);
case ReadChunk: case ReadChunk:
return handleReadChunk(request, kvContainer); return handleReadChunk(request, kvContainer, dispatcherContext);
case DeleteChunk: case DeleteChunk:
return handleDeleteChunk(request, kvContainer); return handleDeleteChunk(request, kvContainer);
case WriteChunk: case WriteChunk:
return handleWriteChunk(request, kvContainer); return handleWriteChunk(request, kvContainer, dispatcherContext);
case ListChunk: case ListChunk:
return handleUnsupportedOp(request); return handleUnsupportedOp(request);
case CompactChunk: case CompactChunk:
return handleUnsupportedOp(request); return handleUnsupportedOp(request);
case PutSmallFile: case PutSmallFile:
return handlePutSmallFile(request, kvContainer); return handlePutSmallFile(request, kvContainer, dispatcherContext);
case GetSmallFile: case GetSmallFile:
return handleGetSmallFile(request, kvContainer); return handleGetSmallFile(request, kvContainer);
case GetCommittedBlockLength: case GetCommittedBlockLength:
@ -392,7 +395,8 @@ public class KeyValueHandler extends Handler {
* Handle Put Block operation. Calls BlockManager to process the request. * Handle Put Block operation. Calls BlockManager to process the request.
*/ */
ContainerCommandResponseProto handlePutBlock( ContainerCommandResponseProto handlePutBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) { ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
long blockLength; long blockLength;
if (!request.hasPutBlock()) { if (!request.hasPutBlock()) {
@ -401,14 +405,18 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request); return ContainerUtils.malformedRequest(request);
} }
BlockData blockData;
try { try {
checkContainerOpen(kvContainer); checkContainerOpen(kvContainer);
BlockData blockData = BlockData.getFromProtoBuf( blockData = BlockData.getFromProtoBuf(
request.getPutBlock().getBlockData()); request.getPutBlock().getBlockData());
Preconditions.checkNotNull(blockData); Preconditions.checkNotNull(blockData);
long bcsId =
dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
blockData.setBlockCommitSequenceId(bcsId);
long numBytes = blockData.getProtoBufMessage().toByteArray().length; long numBytes = blockData.getProtoBufMessage().toByteArray().length;
blockLength = blockManager.putBlock(kvContainer, blockData); blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutBlock, numBytes); metrics.incContainerBytesStats(Type.PutBlock, numBytes);
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request); return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -418,7 +426,7 @@ public class KeyValueHandler extends Handler {
request); request);
} }
return BlockUtils.putBlockResponseSuccess(request, blockLength); return BlockUtils.putBlockResponseSuccess(request, blockData);
} }
/** /**
@ -514,7 +522,8 @@ public class KeyValueHandler extends Handler {
* Handle Read Chunk operation. Calls ChunkManager to process the request. * Handle Read Chunk operation. Calls ChunkManager to process the request.
*/ */
ContainerCommandResponseProto handleReadChunk( ContainerCommandResponseProto handleReadChunk(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) { ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
if (!request.hasReadChunk()) { if (!request.hasReadChunk()) {
LOG.debug("Malformed Read Chunk request. trace ID: {}", LOG.debug("Malformed Read Chunk request. trace ID: {}",
@ -531,8 +540,10 @@ public class KeyValueHandler extends Handler {
.getChunkData()); .getChunkData());
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
data = chunkManager.readChunk(kvContainer, blockID, chunkInfo, boolean isReadFromTmpFile = dispatcherContext == null ? false :
request.getReadChunk().getReadFromTmpFile()); dispatcherContext.isReadFromTmpFile();
data = chunkManager
.readChunk(kvContainer, blockID, chunkInfo, isReadFromTmpFile);
metrics.incContainerBytesStats(Type.ReadChunk, data.length); metrics.incContainerBytesStats(Type.ReadChunk, data.length);
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request); return ContainerUtils.logAndReturnError(LOG, ex, request);
@ -583,7 +594,8 @@ public class KeyValueHandler extends Handler {
* Handle Write Chunk operation. Calls ChunkManager to process the request. * Handle Write Chunk operation. Calls ChunkManager to process the request.
*/ */
ContainerCommandResponseProto handleWriteChunk( ContainerCommandResponseProto handleWriteChunk(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) { ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
if (!request.hasWriteChunk()) { if (!request.hasWriteChunk()) {
LOG.debug("Malformed Write Chunk request. trace ID: {}", LOG.debug("Malformed Write Chunk request. trace ID: {}",
@ -602,17 +614,19 @@ public class KeyValueHandler extends Handler {
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
ByteBuffer data = null; ByteBuffer data = null;
if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || WriteChunkStage stage =
request.getWriteChunk().getStage() == Stage.COMBINED) { dispatcherContext == null ? WriteChunkStage.COMBINED :
dispatcherContext.getStage();
if (stage == WriteChunkStage.WRITE_DATA ||
stage == WriteChunkStage.COMBINED) {
data = request.getWriteChunk().getData().asReadOnlyByteBuffer(); data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
} }
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage);
request.getWriteChunk().getStage());
// We should increment stats after writeChunk // We should increment stats after writeChunk
if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || if (stage == WriteChunkStage.WRITE_DATA||
request.getWriteChunk().getStage() == Stage.COMBINED) { stage == WriteChunkStage.COMBINED) {
metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
.getChunkData().getLen()); .getChunkData().getLen());
} }
@ -633,7 +647,8 @@ public class KeyValueHandler extends Handler {
* request. * request.
*/ */
ContainerCommandResponseProto handlePutSmallFile( ContainerCommandResponseProto handlePutSmallFile(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) { ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
if (!request.hasPutSmallFile()) { if (!request.hasPutSmallFile()) {
LOG.debug("Malformed Put Small File request. trace ID: {}", LOG.debug("Malformed Put Small File request. trace ID: {}",
@ -642,13 +657,14 @@ public class KeyValueHandler extends Handler {
} }
PutSmallFileRequestProto putSmallFileReq = PutSmallFileRequestProto putSmallFileReq =
request.getPutSmallFile(); request.getPutSmallFile();
BlockData blockData;
try { try {
checkContainerOpen(kvContainer); checkContainerOpen(kvContainer);
BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock() BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock()
.getBlockData().getBlockID()); .getBlockData().getBlockID());
BlockData blockData = BlockData.getFromProtoBuf( blockData = BlockData.getFromProtoBuf(
putSmallFileReq.getBlock().getBlockData()); putSmallFileReq.getBlock().getBlockData());
Preconditions.checkNotNull(blockData); Preconditions.checkNotNull(blockData);
@ -656,15 +672,20 @@ public class KeyValueHandler extends Handler {
putSmallFileReq.getChunkInfo()); putSmallFileReq.getChunkInfo());
Preconditions.checkNotNull(chunkInfo); Preconditions.checkNotNull(chunkInfo);
ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer(); ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
WriteChunkStage stage =
dispatcherContext == null ? WriteChunkStage.COMBINED :
dispatcherContext.getStage();
// chunks will be committed as a part of handling putSmallFile // chunks will be committed as a part of handling putSmallFile
// here. There is no need to maintain this info in openContainerBlockMap. // here. There is no need to maintain this info in openContainerBlockMap.
chunkManager.writeChunk( chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage);
kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
chunks.add(chunkInfo.getProtoBufMessage()); chunks.add(chunkInfo.getProtoBufMessage());
blockData.setChunks(chunks); blockData.setChunks(chunks);
// TODO: add bcsId as a part of putSmallFile transaction long bcsId =
dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
blockData.setBlockCommitSequenceId(bcsId);
blockManager.putBlock(kvContainer, blockData); blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity()); metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());
@ -676,7 +697,7 @@ public class KeyValueHandler extends Handler {
PUT_SMALL_FILE_ERROR), request); PUT_SMALL_FILE_ERROR), request);
} }
return SmallFileUtils.getPutFileResponseSuccess(request); return SmallFileUtils.getPutFileResponseSuccess(request, blockData);
} }
/** /**

View File

@ -133,12 +133,12 @@ public final class BlockUtils {
* @return Response. * @return Response.
*/ */
public static ContainerCommandResponseProto putBlockResponseSuccess( public static ContainerCommandResponseProto putBlockResponseSuccess(
ContainerCommandRequestProto msg, long blockLength) { ContainerCommandRequestProto msg, BlockData blockData) {
ContainerProtos.BlockData blockData = msg.getPutBlock().getBlockData(); ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
GetCommittedBlockLengthResponseProto.Builder GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder = committedBlockLengthResponseBuilder =
getCommittedBlockLengthResponseBuilder(blockLength, getCommittedBlockLengthResponseBuilder(blockData.getSize(),
blockData.getBlockID()); blockDataProto.getBlockID());
PutBlockResponseProto.Builder putKeyResponse = PutBlockResponseProto.Builder putKeyResponse =
PutBlockResponseProto.newBuilder(); PutBlockResponseProto.newBuilder();
putKeyResponse putKeyResponse

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@ -44,15 +45,14 @@ public final class SmallFileUtils {
* @return - ContainerCommandResponseProto * @return - ContainerCommandResponseProto
*/ */
public static ContainerCommandResponseProto getPutFileResponseSuccess( public static ContainerCommandResponseProto getPutFileResponseSuccess(
ContainerCommandRequestProto msg) { ContainerCommandRequestProto msg, BlockData blockData) {
ContainerProtos.PutSmallFileResponseProto.Builder getResponse = ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
ContainerProtos.PutSmallFileResponseProto.newBuilder(); ContainerProtos.PutSmallFileResponseProto.newBuilder();
ContainerProtos.BlockData blockData = ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
msg.getPutSmallFile().getBlock().getBlockData();
ContainerProtos.GetCommittedBlockLengthResponseProto.Builder ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
committedBlockLengthResponseBuilder = BlockUtils committedBlockLengthResponseBuilder = BlockUtils
.getCommittedBlockLengthResponseBuilder(blockData.getSize(), .getCommittedBlockLengthResponseBuilder(blockDataProto.getSize(),
blockData.getBlockID()); blockDataProto.getBlockID());
getResponse.setCommittedBlockLength(committedBlockLengthResponseBuilder); getResponse.setCommittedBlockLength(committedBlockLengthResponseBuilder);
ContainerCommandResponseProto.Builder builder = ContainerCommandResponseProto.Builder builder =
ContainerUtils.getSuccessResponseBuilder(msg); ContainerUtils.getSuccessResponseBuilder(msg);

View File

@ -21,10 +21,10 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager {
* @throws StorageContainerException * @throws StorageContainerException
*/ */
public void writeChunk(Container container, BlockID blockID, ChunkInfo info, public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ByteBuffer data, ContainerProtos.Stage stage) ByteBuffer data, DispatcherContext.WriteChunkStage stage)
throws StorageContainerException { throws StorageContainerException {
try { try {

View File

@ -19,10 +19,11 @@ package org.apache.hadoop.ozone.container.keyvalue.interfaces;
*/ */
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
@ -42,7 +43,7 @@ public interface ChunkManager {
* @throws StorageContainerException * @throws StorageContainerException
*/ */
void writeChunk(Container container, BlockID blockID, ChunkInfo info, void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ByteBuffer data, ContainerProtos.Stage stage) ByteBuffer data, DispatcherContext.WriteChunkStage stage)
throws StorageContainerException; throws StorageContainerException;
/** /**

View File

@ -99,16 +99,16 @@ public class TestHddsDispatcher {
HddsDispatcher hddsDispatcher = new HddsDispatcher( HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics); conf, containerSet, volumeSet, handlers, context, metrics);
hddsDispatcher.setScmId(scmId.toString()); hddsDispatcher.setScmId(scmId.toString());
ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch( ContainerCommandResponseProto responseOne = hddsDispatcher
getWriteChunkRequest(dd.getUuidString(), 1L, 1L)); .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, Assert.assertEquals(ContainerProtos.Result.SUCCESS,
responseOne.getResult()); responseOne.getResult());
verify(context, times(0)) verify(context, times(0))
.addContainerActionIfAbsent(Mockito.any(ContainerAction.class)); .addContainerActionIfAbsent(Mockito.any(ContainerAction.class));
containerData.setBytesUsed(Double.valueOf( containerData.setBytesUsed(Double.valueOf(
StorageUnit.MB.toBytes(950)).longValue()); StorageUnit.MB.toBytes(950)).longValue());
ContainerCommandResponseProto responseTwo = hddsDispatcher.dispatch( ContainerCommandResponseProto responseTwo = hddsDispatcher
getWriteChunkRequest(dd.getUuidString(), 1L, 2L)); .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, Assert.assertEquals(ContainerProtos.Result.SUCCESS,
responseTwo.getResult()); responseTwo.getResult());
verify(context, times(1)) verify(context, times(1))
@ -150,16 +150,16 @@ public class TestHddsDispatcher {
getWriteChunkRequest(dd.getUuidString(), 1L, 1L); getWriteChunkRequest(dd.getUuidString(), 1L, 1L);
// send read chunk request and make sure container does not exist // send read chunk request and make sure container does not exist
ContainerCommandResponseProto response = ContainerCommandResponseProto response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest)); hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null);
Assert.assertEquals(response.getResult(), Assert.assertEquals(response.getResult(),
ContainerProtos.Result.CONTAINER_NOT_FOUND); ContainerProtos.Result.CONTAINER_NOT_FOUND);
// send write chunk request without sending create container // send write chunk request without sending create container
response = hddsDispatcher.dispatch(writeChunkRequest); response = hddsDispatcher.dispatch(writeChunkRequest, null);
// container should be created as part of write chunk request // container should be created as part of write chunk request
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
// send read chunk request to read the chunk written above // send read chunk request to read the chunk written above
response = response =
hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest)); hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertEquals(response.getReadChunk().getData(), Assert.assertEquals(response.getReadChunk().getData(),
writeChunkRequest.getWriteChunk().getData()); writeChunkRequest.getWriteChunk().getData());

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@ -114,7 +115,7 @@ public class TestChunkManagerImpl {
// As no chunks are written to the volume writeBytes should be 0 // As no chunks are written to the volume writeBytes should be 0
checkWriteIOStats(0, 0); checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA); ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA);
// Now a chunk file is being written with Stage WRITE_DATA, so it should // Now a chunk file is being written with Stage WRITE_DATA, so it should
// create a temporary chunk file. // create a temporary chunk file.
assertTrue(chunksPath.listFiles().length == 1); assertTrue(chunksPath.listFiles().length == 1);
@ -131,7 +132,7 @@ public class TestChunkManagerImpl {
checkWriteIOStats(data.length, 1); checkWriteIOStats(data.length, 1);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA); ByteBuffer.wrap(data), WriteChunkStage.COMMIT_DATA);
checkWriteIOStats(data.length, 1); checkWriteIOStats(data.length, 1);
@ -151,7 +152,7 @@ public class TestChunkManagerImpl {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength); .getLocalID(), 0), 0, randomLength);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA); ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA);
fail("testWriteChunkIncorrectLength failed"); fail("testWriteChunkIncorrectLength failed");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
// As we got an exception, writeBytes should be 0. // As we got an exception, writeBytes should be 0.
@ -172,7 +173,7 @@ public class TestChunkManagerImpl {
assertTrue(chunksPath.listFiles().length == 0); assertTrue(chunksPath.listFiles().length == 0);
checkWriteIOStats(0, 0); checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
// Now a chunk file is being written with Stage COMBINED_DATA, so it should // Now a chunk file is being written with Stage COMBINED_DATA, so it should
// create a chunk file. // create a chunk file.
assertTrue(chunksPath.listFiles().length == 1); assertTrue(chunksPath.listFiles().length == 1);
@ -185,7 +186,7 @@ public class TestChunkManagerImpl {
public void testReadChunk() throws Exception { public void testReadChunk() throws Exception {
checkWriteIOStats(0, 0); checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
checkWriteIOStats(data.length, 1); checkWriteIOStats(data.length, 1);
checkReadIOStats(0, 0); checkReadIOStats(0, 0);
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
@ -199,7 +200,7 @@ public class TestChunkManagerImpl {
public void testDeleteChunk() throws Exception { public void testDeleteChunk() throws Exception {
File chunksPath = new File(keyValueContainerData.getChunksPath()); File chunksPath = new File(keyValueContainerData.getChunksPath());
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
assertTrue(chunksPath.listFiles().length == 1); assertTrue(chunksPath.listFiles().length == 1);
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
assertTrue(chunksPath.listFiles().length == 0); assertTrue(chunksPath.listFiles().length == 0);
@ -209,7 +210,7 @@ public class TestChunkManagerImpl {
public void testDeleteChunkUnsupportedRequest() throws Exception { public void testDeleteChunkUnsupportedRequest() throws Exception {
try { try {
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
long randomLength = 200L; long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength); .getLocalID(), 0), 0, randomLength);
@ -241,7 +242,7 @@ public class TestChunkManagerImpl {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), i), 0, data.length); .getLocalID(), i), 0, data.length);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
} }
checkWriteIOStats(data.length*100, 100); checkWriteIOStats(data.length*100, 100);
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0); assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
@ -84,10 +85,10 @@ public class TestKeyValueHandler {
handler = Mockito.mock(KeyValueHandler.class); handler = Mockito.mock(KeyValueHandler.class);
dispatcher = Mockito.mock(HddsDispatcher.class); dispatcher = Mockito.mock(HddsDispatcher.class);
Mockito.when(dispatcher.getHandler(any())).thenReturn(handler); Mockito.when(dispatcher.getHandler(any())).thenReturn(handler);
Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod(); Mockito.when(dispatcher.dispatch(any(), any())).thenCallRealMethod();
Mockito.when(dispatcher.getContainer(anyLong())).thenReturn( Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
Mockito.mock(KeyValueContainer.class)); Mockito.mock(KeyValueContainer.class));
Mockito.when(handler.handle(any(), any())).thenCallRealMethod(); Mockito.when(handler.handle(any(), any(), any())).thenCallRealMethod();
doCallRealMethod().when(dispatcher).setMetricsForTesting(any()); doCallRealMethod().when(dispatcher).setMetricsForTesting(any());
dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class)); dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class));
Mockito.when(dispatcher.buildAuditMessageForFailure(any(), any(), any())) Mockito.when(dispatcher.buildAuditMessageForFailure(any(), any(), any()))
@ -111,112 +112,113 @@ public class TestKeyValueHandler {
.setCreateContainer(ContainerProtos.CreateContainerRequestProto .setCreateContainer(ContainerProtos.CreateContainerRequestProto
.getDefaultInstance()) .getDefaultInstance())
.build(); .build();
dispatcher.dispatch(createContainerRequest); DispatcherContext context = new DispatcherContext.Builder().build();
dispatcher.dispatch(createContainerRequest, context);
Mockito.verify(handler, times(1)).handleCreateContainer( Mockito.verify(handler, times(1)).handleCreateContainer(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test Read Container Request handling // Test Read Container Request handling
ContainerCommandRequestProto readContainerRequest = ContainerCommandRequestProto readContainerRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer); getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer);
dispatcher.dispatch(readContainerRequest); dispatcher.dispatch(readContainerRequest, context);
Mockito.verify(handler, times(1)).handleReadContainer( Mockito.verify(handler, times(1)).handleReadContainer(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test Update Container Request handling // Test Update Container Request handling
ContainerCommandRequestProto updateContainerRequest = ContainerCommandRequestProto updateContainerRequest =
getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer); getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer);
dispatcher.dispatch(updateContainerRequest); dispatcher.dispatch(updateContainerRequest, context);
Mockito.verify(handler, times(1)).handleUpdateContainer( Mockito.verify(handler, times(1)).handleUpdateContainer(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test Delete Container Request handling // Test Delete Container Request handling
ContainerCommandRequestProto deleteContainerRequest = ContainerCommandRequestProto deleteContainerRequest =
getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer); getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer);
dispatcher.dispatch(deleteContainerRequest); dispatcher.dispatch(deleteContainerRequest, null);
Mockito.verify(handler, times(1)).handleDeleteContainer( Mockito.verify(handler, times(1)).handleDeleteContainer(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test List Container Request handling // Test List Container Request handling
ContainerCommandRequestProto listContainerRequest = ContainerCommandRequestProto listContainerRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ListContainer); getDummyCommandRequestProto(ContainerProtos.Type.ListContainer);
dispatcher.dispatch(listContainerRequest); dispatcher.dispatch(listContainerRequest, context);
Mockito.verify(handler, times(1)).handleUnsupportedOp( Mockito.verify(handler, times(1)).handleUnsupportedOp(
any(ContainerCommandRequestProto.class)); any(ContainerCommandRequestProto.class));
// Test Close Container Request handling // Test Close Container Request handling
ContainerCommandRequestProto closeContainerRequest = ContainerCommandRequestProto closeContainerRequest =
getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer); getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer);
dispatcher.dispatch(closeContainerRequest); dispatcher.dispatch(closeContainerRequest, context);
Mockito.verify(handler, times(1)).handleCloseContainer( Mockito.verify(handler, times(1)).handleCloseContainer(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test Put Block Request handling // Test Put Block Request handling
ContainerCommandRequestProto putBlockRequest = ContainerCommandRequestProto putBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.PutBlock); getDummyCommandRequestProto(ContainerProtos.Type.PutBlock);
dispatcher.dispatch(putBlockRequest); dispatcher.dispatch(putBlockRequest, context);
Mockito.verify(handler, times(1)).handlePutBlock( Mockito.verify(handler, times(1)).handlePutBlock(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any(), any());
// Test Get Block Request handling // Test Get Block Request handling
ContainerCommandRequestProto getBlockRequest = ContainerCommandRequestProto getBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.GetBlock); getDummyCommandRequestProto(ContainerProtos.Type.GetBlock);
dispatcher.dispatch(getBlockRequest); dispatcher.dispatch(getBlockRequest, context);
Mockito.verify(handler, times(1)).handleGetBlock( Mockito.verify(handler, times(1)).handleGetBlock(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test Delete Block Request handling // Test Delete Block Request handling
ContainerCommandRequestProto deleteBlockRequest = ContainerCommandRequestProto deleteBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.DeleteBlock); getDummyCommandRequestProto(ContainerProtos.Type.DeleteBlock);
dispatcher.dispatch(deleteBlockRequest); dispatcher.dispatch(deleteBlockRequest, context);
Mockito.verify(handler, times(1)).handleDeleteBlock( Mockito.verify(handler, times(1)).handleDeleteBlock(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test List Block Request handling // Test List Block Request handling
ContainerCommandRequestProto listBlockRequest = ContainerCommandRequestProto listBlockRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ListBlock); getDummyCommandRequestProto(ContainerProtos.Type.ListBlock);
dispatcher.dispatch(listBlockRequest); dispatcher.dispatch(listBlockRequest, context);
Mockito.verify(handler, times(2)).handleUnsupportedOp( Mockito.verify(handler, times(2)).handleUnsupportedOp(
any(ContainerCommandRequestProto.class)); any(ContainerCommandRequestProto.class));
// Test Read Chunk Request handling // Test Read Chunk Request handling
ContainerCommandRequestProto readChunkRequest = ContainerCommandRequestProto readChunkRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk); getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk);
dispatcher.dispatch(readChunkRequest); dispatcher.dispatch(readChunkRequest, context);
Mockito.verify(handler, times(1)).handleReadChunk( Mockito.verify(handler, times(1)).handleReadChunk(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any(), any());
// Test Delete Chunk Request handling // Test Delete Chunk Request handling
ContainerCommandRequestProto deleteChunkRequest = ContainerCommandRequestProto deleteChunkRequest =
getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk); getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk);
dispatcher.dispatch(deleteChunkRequest); dispatcher.dispatch(deleteChunkRequest, context);
Mockito.verify(handler, times(1)).handleDeleteChunk( Mockito.verify(handler, times(1)).handleDeleteChunk(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
// Test Write Chunk Request handling // Test Write Chunk Request handling
ContainerCommandRequestProto writeChunkRequest = ContainerCommandRequestProto writeChunkRequest =
getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk); getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk);
dispatcher.dispatch(writeChunkRequest); dispatcher.dispatch(writeChunkRequest, context);
Mockito.verify(handler, times(1)).handleWriteChunk( Mockito.verify(handler, times(1)).handleWriteChunk(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any(), any());
// Test List Chunk Request handling // Test List Chunk Request handling
ContainerCommandRequestProto listChunkRequest = ContainerCommandRequestProto listChunkRequest =
getDummyCommandRequestProto(ContainerProtos.Type.ListChunk); getDummyCommandRequestProto(ContainerProtos.Type.ListChunk);
dispatcher.dispatch(listChunkRequest); dispatcher.dispatch(listChunkRequest, context);
Mockito.verify(handler, times(3)).handleUnsupportedOp( Mockito.verify(handler, times(3)).handleUnsupportedOp(
any(ContainerCommandRequestProto.class)); any(ContainerCommandRequestProto.class));
// Test Put Small File Request handling // Test Put Small File Request handling
ContainerCommandRequestProto putSmallFileRequest = ContainerCommandRequestProto putSmallFileRequest =
getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile); getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile);
dispatcher.dispatch(putSmallFileRequest); dispatcher.dispatch(putSmallFileRequest, context);
Mockito.verify(handler, times(1)).handlePutSmallFile( Mockito.verify(handler, times(1)).handlePutSmallFile(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any(), any());
// Test Get Small File Request handling // Test Get Small File Request handling
ContainerCommandRequestProto getSmallFileRequest = ContainerCommandRequestProto getSmallFileRequest =
getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile); getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile);
dispatcher.dispatch(getSmallFileRequest); dispatcher.dispatch(getSmallFileRequest, context);
Mockito.verify(handler, times(1)).handleGetSmallFile( Mockito.verify(handler, times(1)).handleGetSmallFile(
any(ContainerCommandRequestProto.class), any()); any(ContainerCommandRequestProto.class), any());
} }
@ -294,7 +296,7 @@ public class TestKeyValueHandler {
.setCloseContainer(ContainerProtos.CloseContainerRequestProto .setCloseContainer(ContainerProtos.CloseContainerRequestProto
.getDefaultInstance()) .getDefaultInstance())
.build(); .build();
dispatcher.dispatch(closeContainerRequest); dispatcher.dispatch(closeContainerRequest, null);
Mockito.when(handler.handleCloseContainer(any(), any())) Mockito.when(handler.handleCloseContainer(any(), any()))
.thenCallRealMethod(); .thenCallRealMethod();

View File

@ -78,7 +78,7 @@ import java.util.UUID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
@ -334,7 +334,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
return info; return info;
} }
@ -375,7 +375,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
fileHashMap.put(fileName, info); fileHashMap.put(fileName, info);
} }
@ -433,7 +433,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
byte[] readData = chunkManager.readChunk(container, blockID, info, false); byte[] readData = chunkManager.readChunk(container, blockID, info, false);
assertTrue(Arrays.equals(data, readData)); assertTrue(Arrays.equals(data, readData));
@ -466,13 +466,13 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
// With the overwrite flag it should work now. // With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
long bytesUsed = container.getContainerData().getBytesUsed(); long bytesUsed = container.getContainerData().getBytesUsed();
Assert.assertEquals(datalen, bytesUsed); Assert.assertEquals(datalen, bytesUsed);
@ -507,7 +507,7 @@ public class TestContainerPersistence {
oldSha.update(data); oldSha.update(data);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
} }
// Request to read the whole data in a single go. // Request to read the whole data in a single go.
@ -540,7 +540,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
chunkManager.deleteChunk(container, blockID, info); chunkManager.deleteChunk(container, blockID, info);
exception.expect(StorageContainerException.class); exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the chunk file."); exception.expectMessage("Unable to find the chunk file.");
@ -655,7 +655,7 @@ public class TestContainerPersistence {
byte[] data = getData(datalen); byte[] data = getData(datalen);
setDataChecksum(info, data); setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED); WriteChunkStage.COMBINED);
totalSize += datalen; totalSize += datalen;
chunkList.add(info); chunkList.add(info);
} }

View File

@ -169,7 +169,8 @@ public class TestCSMMetrics {
*/ */
@Override @Override
public ContainerCommandResponseProto dispatch( public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg) { ContainerCommandRequestProto msg,
DispatcherContext context) {
return ContainerTestHelper.getCreateContainerResponse(msg); return ContainerTestHelper.getCreateContainerResponse(msg);
} }

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
@ -236,8 +237,9 @@ public class TestContainerServer {
* @return Command Response * @return Command Response
*/ */
@Override @Override
public ContainerCommandResponseProto public ContainerCommandResponseProto dispatch(
dispatch(ContainerCommandRequestProto msg) { ContainerCommandRequestProto msg,
DispatcherContext context) {
return ContainerTestHelper.getCreateContainerResponse(msg); return ContainerTestHelper.getCreateContainerResponse(msg);
} }

View File

@ -130,7 +130,7 @@ public class BenchMarkDatanodeDispatcher {
for (int x = 0; x < INIT_CONTAINERS; x++) { for (int x = 0; x < INIT_CONTAINERS; x++) {
long containerID = HddsUtils.getUtcTime() + x; long containerID = HddsUtils.getUtcTime() + x;
ContainerCommandRequestProto req = getCreateContainerCommand(containerID); ContainerCommandRequestProto req = getCreateContainerCommand(containerID);
dispatcher.dispatch(req); dispatcher.dispatch(req, null);
containers.add(containerID); containers.add(containerID);
containerCount.getAndIncrement(); containerCount.getAndIncrement();
} }
@ -153,8 +153,8 @@ public class BenchMarkDatanodeDispatcher {
long containerID = containers.get(y); long containerID = containers.get(y);
BlockID blockID = new BlockID(containerID, key); BlockID blockID = new BlockID(containerID, key);
dispatcher dispatcher
.dispatch(getPutBlockCommand(blockID, chunkName)); .dispatch(getPutBlockCommand(blockID, chunkName), null);
dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName)); dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName), null);
} }
} }
} }
@ -268,7 +268,7 @@ public class BenchMarkDatanodeDispatcher {
public void createContainer(BenchMarkDatanodeDispatcher bmdd) { public void createContainer(BenchMarkDatanodeDispatcher bmdd) {
long containerID = RandomUtils.nextLong(); long containerID = RandomUtils.nextLong();
ContainerCommandRequestProto req = getCreateContainerCommand(containerID); ContainerCommandRequestProto req = getCreateContainerCommand(containerID);
bmdd.dispatcher.dispatch(req); bmdd.dispatcher.dispatch(req, null);
bmdd.containers.add(containerID); bmdd.containers.add(containerID);
bmdd.containerCount.getAndIncrement(); bmdd.containerCount.getAndIncrement();
} }
@ -277,27 +277,27 @@ public class BenchMarkDatanodeDispatcher {
@Benchmark @Benchmark
public void writeChunk(BenchMarkDatanodeDispatcher bmdd) { public void writeChunk(BenchMarkDatanodeDispatcher bmdd) {
bmdd.dispatcher.dispatch(getWriteChunkCommand( bmdd.dispatcher.dispatch(getWriteChunkCommand(
getRandomBlockID(), getNewChunkToWrite())); getRandomBlockID(), getNewChunkToWrite()), null);
} }
@Benchmark @Benchmark
public void readChunk(BenchMarkDatanodeDispatcher bmdd) { public void readChunk(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID(); BlockID blockID = getRandomBlockID();
String chunkKey = getRandomChunkToRead(); String chunkKey = getRandomChunkToRead();
bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey)); bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey), null);
} }
@Benchmark @Benchmark
public void putBlock(BenchMarkDatanodeDispatcher bmdd) { public void putBlock(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID(); BlockID blockID = getRandomBlockID();
String chunkKey = getNewChunkToWrite(); String chunkKey = getNewChunkToWrite();
bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey)); bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey), null);
} }
@Benchmark @Benchmark
public void getBlock(BenchMarkDatanodeDispatcher bmdd) { public void getBlock(BenchMarkDatanodeDispatcher bmdd) {
BlockID blockID = getRandomBlockID(); BlockID blockID = getRandomBlockID();
bmdd.dispatcher.dispatch(getGetBlockCommand(blockID)); bmdd.dispatcher.dispatch(getGetBlockCommand(blockID), null);
} }
// Chunks writes from benchmark only reaches certain containers // Chunks writes from benchmark only reaches certain containers