HDFS-13078. Ozone: Update Ratis on Ozone to 0.1.1-alpha-8fd74ed-SNAPSHOT.

To fix large chunk reads (>4M) from Datanodes. Contributed by Mukul Kumar Singh.
This commit is contained in:
Anu Engineer 2018-02-20 10:53:33 -08:00 committed by Owen O'Malley
parent 254cbe2589
commit 4c10a849e8
4 changed files with 200 additions and 67 deletions

View File

@ -19,15 +19,18 @@
package org.apache.ratis; package org.apache.ratis;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -113,6 +116,8 @@ public interface RatisHelper {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties(); final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType); RaftConfigKeys.Rpc.setType(properties, rpcType);
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE));
return RaftClient.newBuilder() return RaftClient.newBuilder()
.setRaftGroup(group) .setRaftGroup(group)

View File

@ -26,17 +26,17 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkReq
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.BaseStateMachine; import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@ -55,8 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
* *
* Read only requests are classified in * Read only requests are classified in
* {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly} * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
* and these readonly requests are replied from the * and these readonly requests are replied from the {@link #query(Message)}.
* {@link #query(RaftClientRequest)}
* *
* The write requests can be divided into requests with user data * The write requests can be divided into requests with user data
* (WriteChunkRequest) and other request without user data. * (WriteChunkRequest) and other request without user data.
@ -90,7 +89,7 @@ public class ContainerStateMachine extends BaseStateMachine {
= new SimpleStateMachineStorage(); = new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher; private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor writeChunkExecutor; private ThreadPoolExecutor writeChunkExecutor;
private final ConcurrentHashMap<String, CompletableFuture<Message>> private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap; writeChunkFutureMap;
private final ConcurrentHashMap<String, CompletableFuture<Message>> private final ConcurrentHashMap<String, CompletableFuture<Message>>
createContainerFutureMap; createContainerFutureMap;
@ -171,7 +170,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setData(request.getMessage().getContent()) .setData(request.getMessage().getContent())
.build(); .build();
} }
return new TransactionContext(this, request, log); return new TransactionContextImpl(this, request, log);
} }
private ByteString getShadedByteString(ContainerCommandRequestProto proto) { private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
@ -191,34 +190,47 @@ public class ContainerStateMachine extends BaseStateMachine {
return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()); return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
} }
private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
String containerName = write.getPipeline().getContainerName();
CompletableFuture<Message> future =
createContainerFutureMap.get(containerName);
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), writeChunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), writeChunkExecutor);
}
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
return writeChunkFuture;
}
private CompletableFuture<Message> handleCreateContainer(
ContainerCommandRequestProto requestProto) {
String containerName =
requestProto.getCreateContainer().getContainerData().getName();
createContainerFutureMap.
computeIfAbsent(containerName, k -> new CompletableFuture<>());
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
}
@Override @Override
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) { public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try { try {
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData()); getRequestProto(entry.getSmLogEntry().getStateMachineData());
if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { ContainerProtos.Type cmdType = requestProto.getCmdType();
String containerName = switch (cmdType) {
requestProto.getCreateContainer().getContainerData().getName(); case CreateContainer:
createContainerFutureMap. return handleCreateContainer(requestProto);
computeIfAbsent(containerName, k -> new CompletableFuture<>()); case WriteChunk:
return CompletableFuture.completedFuture(() -> ByteString.EMPTY); return handleWriteChunk(requestProto, entry.getIndex());
} else { default:
final WriteChunkRequestProto write = requestProto.getWriteChunk(); throw new IllegalStateException("Cmd Type:" + cmdType
String containerName = write.getPipeline().getContainerName(); + " should not have state machine data");
CompletableFuture<Message> future =
createContainerFutureMap.get(containerName);
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), writeChunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), writeChunkExecutor);
}
writeChunkFutureMap
.put(write.getChunkData().getChunkName(), writeChunkFuture);
return writeChunkFuture;
} }
} catch (IOException e) { } catch (IOException e) {
return completeExceptionally(e); return completeExceptionally(e);
@ -226,13 +238,11 @@ public class ContainerStateMachine extends BaseStateMachine {
} }
@Override @Override
public CompletableFuture<RaftClientReply> query(RaftClientRequest request) { public CompletableFuture<Message> query(Message request) {
try { try {
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getRequestProto(request.getMessage().getContent()); getRequestProto(request.getContent());
RaftClientReply raftClientReply = return CompletableFuture.completedFuture(runCommand(requestProto));
new RaftClientReply(request, runCommand(requestProto));
return CompletableFuture.completedFuture(raftClientReply);
} catch (IOException e) { } catch (IOException e) {
return completeExceptionally(e); return completeExceptionally(e);
} }
@ -243,19 +253,20 @@ public class ContainerStateMachine extends BaseStateMachine {
try { try {
ContainerCommandRequestProto requestProto = ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData()); getRequestProto(trx.getSMLogEntry().getData());
ContainerProtos.Type cmdType = requestProto.getCmdType();
if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { if (cmdType == ContainerProtos.Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk(); WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction // the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData()); Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture = CompletableFuture<Message> stateMachineFuture =
writeChunkFutureMap.remove(write.getChunkData().getChunkName()); writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
return stateMachineFuture return stateMachineFuture
.thenComposeAsync(v -> .thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto))); CompletableFuture.completedFuture(runCommand(requestProto)));
} else { } else {
Message message = runCommand(requestProto); Message message = runCommand(requestProto);
if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { if (cmdType == ContainerProtos.Type.CreateContainer) {
String containerName = String containerName =
requestProto.getCreateContainer().getContainerData().getName(); requestProto.getCreateContainer().getContainerData().getName();
createContainerFutureMap.remove(containerName).complete(message); createContainerFutureMap.remove(containerName).complete(message);

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.tools;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
/**
* Tests Freon, with MiniOzoneCluster and validate data.
*/
public class TestDataValidate {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.numDataNodes(5).build();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void ratisTestLargeKey() throws Exception {
List<String> args = new ArrayList<>();
args.add("-validateWrites");
args.add("-numOfVolumes");
args.add("1");
args.add("-numOfBuckets");
args.add("1");
args.add("-numOfKeys");
args.add("1");
args.add("-ratis");
args.add("3");
args.add("-keySize");
args.add("104857600");
Freon freon = new Freon(conf);
int res = ToolRunner.run(conf, freon,
args.toArray(new String[0]));
Assert.assertEquals(1, freon.getNumberOfVolumesCreated());
Assert.assertEquals(1, freon.getNumberOfBucketsCreated());
Assert.assertEquals(1, freon.getNumberOfKeysAdded());
Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
Assert.assertEquals(0, res);
}
@Test
public void standaloneTestLargeKey() throws Exception {
List<String> args = new ArrayList<>();
args.add("-validateWrites");
args.add("-numOfVolumes");
args.add("1");
args.add("-numOfBuckets");
args.add("1");
args.add("-numOfKeys");
args.add("1");
args.add("-keySize");
args.add("104857600");
Freon freon = new Freon(conf);
int res = ToolRunner.run(conf, freon,
args.toArray(new String[0]));
Assert.assertEquals(1, freon.getNumberOfVolumesCreated());
Assert.assertEquals(1, freon.getNumberOfBucketsCreated());
Assert.assertEquals(1, freon.getNumberOfKeysAdded());
Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
Assert.assertEquals(0, res);
}
@Test
public void validateWriteTest() throws Exception {
PrintStream originalStream = System.out;
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
System.setOut(new PrintStream(outStream));
List<String> args = new ArrayList<>();
args.add("-validateWrites");
args.add("-numOfVolumes");
args.add("2");
args.add("-numOfBuckets");
args.add("5");
args.add("-numOfKeys");
args.add("10");
Freon freon = new Freon(conf);
int res = ToolRunner.run(conf, freon,
args.toArray(new String[0]));
Assert.assertEquals(0, res);
Assert.assertEquals(2, freon.getNumberOfVolumesCreated());
Assert.assertEquals(10, freon.getNumberOfBucketsCreated());
Assert.assertEquals(100, freon.getNumberOfKeysAdded());
Assert.assertTrue(freon.getValidateWrites());
Assert.assertNotEquals(0, freon.getTotalKeysValidated());
Assert.assertNotEquals(0, freon.getSuccessfulValidationCount());
Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
System.setOut(originalStream);
}
}

View File

@ -29,9 +29,7 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -90,33 +88,6 @@ public class TestFreon {
Assert.assertEquals(0, res); Assert.assertEquals(0, res);
} }
@Test
public void validateWriteTest() throws Exception {
PrintStream originalStream = System.out;
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
System.setOut(new PrintStream(outStream));
List<String> args = new ArrayList<>();
args.add("-validateWrites");
args.add("-numOfVolumes");
args.add("2");
args.add("-numOfBuckets");
args.add("5");
args.add("-numOfKeys");
args.add("10");
Freon freon = new Freon(conf);
int res = ToolRunner.run(conf, freon,
args.toArray(new String[0]));
Assert.assertEquals(0, res);
Assert.assertEquals(2, freon.getNumberOfVolumesCreated());
Assert.assertEquals(10, freon.getNumberOfBucketsCreated());
Assert.assertEquals(100, freon.getNumberOfKeysAdded());
Assert.assertTrue(freon.getValidateWrites());
Assert.assertNotEquals(0, freon.getTotalKeysValidated());
Assert.assertNotEquals(0, freon.getSuccessfulValidationCount());
Assert.assertEquals(0, freon.getUnsuccessfulValidationCount());
System.setOut(originalStream);
}
@Test @Test
public void multiThread() throws Exception { public void multiThread() throws Exception {
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();