HDDS-1535. Space tracking for Open Containers : Handle Node Startup. Contributed by Supratim Deka (#832)
* HDDS-1535. Space tracking for Open Containers : Handle Node Startup. Contributed by Supratim Deka * Fixed checkstyle issues. * verifyContainerData also does fixup, renamed. Added a Javadoc comment, both as per review discussion * fixed merge error. adapted to new signature of BlockUtils getDB * fixed checkstyle issue post merge
This commit is contained in:
commit
b4b9120e28
|
@ -27,11 +27,14 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.common.Storage;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||
|
@ -45,6 +48,7 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class used to read .container files from Volume and build container map.
|
||||
|
@ -161,14 +165,21 @@ public class ContainerReader implements Runnable {
|
|||
"Skipping loading of this container.", containerFile);
|
||||
return;
|
||||
}
|
||||
verifyContainerData(containerData);
|
||||
verifyAndFixupContainerData(containerData);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Failed to parse ContainerFile for ContainerID: {}",
|
||||
containerID, ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void verifyContainerData(ContainerData containerData)
|
||||
/**
|
||||
* verify ContainerData loaded from disk and fix-up stale members.
|
||||
* Specifically blockCommitSequenceId, delete related metadata
|
||||
* and bytesUsed
|
||||
* @param containerData
|
||||
* @throws IOException
|
||||
*/
|
||||
public void verifyAndFixupContainerData(ContainerData containerData)
|
||||
throws IOException {
|
||||
switch (containerData.getContainerType()) {
|
||||
case KeyValueContainer:
|
||||
|
@ -203,6 +214,11 @@ public class ContainerReader implements Runnable {
|
|||
kvContainerData
|
||||
.updateBlockCommitSequenceId(Longs.fromByteArray(bcsId));
|
||||
}
|
||||
if (kvContainer.getContainerState()
|
||||
== ContainerProtos.ContainerDataProto.State.OPEN) {
|
||||
// commitSpace for Open Containers relies on usedBytes
|
||||
initializeUsedBytes(kvContainer);
|
||||
}
|
||||
containerSet.addContainer(kvContainer);
|
||||
}
|
||||
} else {
|
||||
|
@ -218,4 +234,27 @@ public class ContainerReader implements Runnable {
|
|||
ContainerProtos.Result.UNKNOWN_CONTAINER_TYPE);
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeUsedBytes(KeyValueContainer container)
|
||||
throws IOException {
|
||||
KeyValueBlockIterator blockIter = new KeyValueBlockIterator(
|
||||
container.getContainerData().getContainerID(),
|
||||
new File(container.getContainerData().getContainerPath()));
|
||||
long usedBytes = 0;
|
||||
|
||||
while (blockIter.hasNext()) {
|
||||
BlockData block = blockIter.nextBlock();
|
||||
long blockLen = 0;
|
||||
|
||||
List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
|
||||
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
|
||||
ChunkInfo info = ChunkInfo.getFromProtoBuf(chunk);
|
||||
blockLen += info.getLen();
|
||||
}
|
||||
|
||||
usedBytes += blockLen;
|
||||
}
|
||||
|
||||
container.getContainerData().setBytesUsed(usedBytes);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,19 +19,27 @@
|
|||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -40,7 +48,9 @@ import org.mockito.Mockito;
|
|||
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
@ -52,7 +62,6 @@ public class TestOzoneContainer {
|
|||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
|
||||
private OzoneConfiguration conf;
|
||||
private String scmId = UUID.randomUUID().toString();
|
||||
private VolumeSet volumeSet;
|
||||
|
@ -60,6 +69,8 @@ public class TestOzoneContainer {
|
|||
private KeyValueContainerData keyValueContainerData;
|
||||
private KeyValueContainer keyValueContainer;
|
||||
private final DatanodeDetails datanodeDetails = createDatanodeDetails();
|
||||
private HashMap<String, Long> commitSpaceMap; //RootDir -> committed space
|
||||
private final int numTestContainers = 10;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -68,6 +79,7 @@ public class TestOzoneContainer {
|
|||
.getAbsolutePath());
|
||||
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
|
||||
folder.newFolder().getAbsolutePath());
|
||||
commitSpaceMap = new HashMap<String, Long>();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -78,16 +90,32 @@ public class TestOzoneContainer {
|
|||
// Format the volumes
|
||||
for (HddsVolume volume : volumeSet.getVolumesList()) {
|
||||
volume.format(UUID.randomUUID().toString());
|
||||
commitSpaceMap.put(getVolumeKey(volume), Long.valueOf(0));
|
||||
}
|
||||
|
||||
// Add containers to disk
|
||||
for (int i=0; i<10; i++) {
|
||||
for (int i = 0; i < numTestContainers; i++) {
|
||||
long freeBytes = 0;
|
||||
long volCommitBytes;
|
||||
long maxCap = (long) StorageUnit.GB.toBytes(1);
|
||||
|
||||
HddsVolume myVolume;
|
||||
|
||||
keyValueContainerData = new KeyValueContainerData(i,
|
||||
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
|
||||
maxCap, UUID.randomUUID().toString(),
|
||||
datanodeDetails.getUuidString());
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, conf);
|
||||
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
|
||||
myVolume = keyValueContainer.getContainerData().getVolume();
|
||||
|
||||
freeBytes = addBlocks(keyValueContainer, 2, 3);
|
||||
|
||||
// update our expectation of volume committed space in the map
|
||||
volCommitBytes = commitSpaceMap.get(getVolumeKey(myVolume)).longValue();
|
||||
Preconditions.checkState(freeBytes >= 0);
|
||||
commitSpaceMap.put(getVolumeKey(myVolume),
|
||||
Long.valueOf(volCommitBytes + freeBytes));
|
||||
}
|
||||
|
||||
DatanodeStateMachine stateMachine = Mockito.mock(
|
||||
|
@ -97,12 +125,65 @@ public class TestOzoneContainer {
|
|||
Mockito.when(context.getParent()).thenReturn(stateMachine);
|
||||
// When OzoneContainer is started, the containers from disk should be
|
||||
// loaded into the containerSet.
|
||||
// Also expected to initialize committed space for each volume.
|
||||
OzoneContainer ozoneContainer = new
|
||||
OzoneContainer(datanodeDetails, conf, context, null);
|
||||
|
||||
ContainerSet containerset = ozoneContainer.getContainerSet();
|
||||
assertEquals(10, containerset.containerCount());
|
||||
assertEquals(numTestContainers, containerset.containerCount());
|
||||
|
||||
verifyCommittedSpace(ozoneContainer);
|
||||
}
|
||||
|
||||
//verify committed space on each volume
|
||||
private void verifyCommittedSpace(OzoneContainer oc) {
|
||||
for (HddsVolume dnVol : oc.getVolumeSet().getVolumesList()) {
|
||||
String key = getVolumeKey(dnVol);
|
||||
long expectedCommit = commitSpaceMap.get(key).longValue();
|
||||
long volumeCommitted = dnVol.getCommittedBytes();
|
||||
assertEquals("Volume committed space not initialized correctly",
|
||||
expectedCommit, volumeCommitted);
|
||||
}
|
||||
}
|
||||
|
||||
private long addBlocks(KeyValueContainer container,
|
||||
int blocks, int chunksPerBlock) throws Exception {
|
||||
String strBlock = "block";
|
||||
String strChunk = "-chunkFile";
|
||||
int datalen = 65536;
|
||||
long usedBytes = 0;
|
||||
|
||||
long freeBytes = container.getContainerData().getMaxSize();
|
||||
long containerId = container.getContainerData().getContainerID();
|
||||
ContainerCache.ReferenceCountedDB db = BlockUtils.getDB(container
|
||||
.getContainerData(), conf);
|
||||
|
||||
for (int bi = 0; bi < blocks; bi++) {
|
||||
// Creating BlockData
|
||||
BlockID blockID = new BlockID(containerId, bi);
|
||||
BlockData blockData = new BlockData(blockID);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
|
||||
|
||||
chunkList.clear();
|
||||
for (int ci = 0; ci < chunksPerBlock; ci++) {
|
||||
String chunkName = strBlock + bi + strChunk + ci;
|
||||
long offset = ci * datalen;
|
||||
ChunkInfo info = new ChunkInfo(chunkName, offset, datalen);
|
||||
usedBytes += datalen;
|
||||
chunkList.add(info.getProtoBufMessage());
|
||||
}
|
||||
blockData.setChunks(chunkList);
|
||||
db.getStore().put(Longs.toByteArray(blockID.getLocalID()),
|
||||
blockData.getProtoBufMessage().toByteArray());
|
||||
}
|
||||
|
||||
// remaining available capacity of the container
|
||||
return (freeBytes - usedBytes);
|
||||
}
|
||||
|
||||
private String getVolumeKey(HddsVolume volume) {
|
||||
return volume.getHddsRootDir().getPath();
|
||||
}
|
||||
|
||||
private DatanodeDetails createDatanodeDetails() {
|
||||
Random random = new Random();
|
||||
|
|
Loading…
Reference in New Issue