HDDS-213. Single lock to synchronize KeyValueContainer#update.
This commit is contained in:
parent
cb9574a337
commit
44e19fc7f7
|
@ -182,22 +182,34 @@ public class ContainerData {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds metadata.
|
* Add/Update metadata.
|
||||||
|
* We should hold the container lock before updating the metadata as this
|
||||||
|
* will be persisted on disk. Unless, we are reconstructing ContainerData
|
||||||
|
* from protoBuf or from on disk .container file in which case lock is not
|
||||||
|
* required.
|
||||||
*/
|
*/
|
||||||
public void addMetadata(String key, String value) throws IOException {
|
public void addMetadata(String key, String value) {
|
||||||
synchronized (this.metadata) {
|
|
||||||
metadata.put(key, value);
|
metadata.put(key, value);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retuns metadata of the container.
|
* Retuns metadata of the container.
|
||||||
* @return metadata
|
* @return metadata
|
||||||
*/
|
*/
|
||||||
public Map<String, String> getMetadata() {
|
public Map<String, String> getMetadata() {
|
||||||
synchronized (this.metadata) {
|
|
||||||
return Collections.unmodifiableMap(this.metadata);
|
return Collections.unmodifiableMap(this.metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set metadata.
|
||||||
|
* We should hold the container lock before updating the metadata as this
|
||||||
|
* will be persisted on disk. Unless, we are reconstructing ContainerData
|
||||||
|
* from protoBuf or from on disk .container file in which case lock is not
|
||||||
|
* required.
|
||||||
|
*/
|
||||||
|
public void setMetadata(Map<String, String> metadataMap) {
|
||||||
|
metadata.clear();
|
||||||
|
metadata.putAll(metadataMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -200,15 +200,7 @@ public final class ContainerDataYaml {
|
||||||
OzoneConsts.METADATA_PATH));
|
OzoneConsts.METADATA_PATH));
|
||||||
kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH));
|
kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH));
|
||||||
Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
|
Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
|
||||||
meta.forEach((key, val) -> {
|
kvData.setMetadata(meta);
|
||||||
try {
|
|
||||||
kvData.addMetadata(key, val);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IllegalStateException("Unexpected " +
|
|
||||||
"Key Value Pair " + "(" + key + "," + val +")in the metadata " +
|
|
||||||
"for containerId " + (long) nodes.get("containerId"));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
String state = (String) nodes.get(OzoneConsts.STATE);
|
String state = (String) nodes.get(OzoneConsts.STATE);
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case "OPEN":
|
case "OPEN":
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.ozone.container.keyvalue;
|
package org.apache.hadoop.ozone.container.keyvalue;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -32,7 +34,6 @@ import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||||
|
@ -58,8 +59,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.Result.CONTAINER_ALREADY_EXISTS;
|
.Result.CONTAINER_ALREADY_EXISTS;
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
||||||
.Result.CONTAINER_METADATA_ERROR;
|
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.Result.CONTAINER_INTERNAL_ERROR;
|
.Result.CONTAINER_INTERNAL_ERROR;
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
@ -146,7 +145,7 @@ public class KeyValueContainer implements Container {
|
||||||
containerData.setVolume(containerVolume);
|
containerData.setVolume(containerVolume);
|
||||||
|
|
||||||
// Create .container file and .chksm file
|
// Create .container file and .chksm file
|
||||||
createContainerFile(containerFile, containerCheckSumFile);
|
writeToContainerFile(containerFile, containerCheckSumFile, true);
|
||||||
|
|
||||||
|
|
||||||
} catch (StorageContainerException ex) {
|
} catch (StorageContainerException ex) {
|
||||||
|
@ -177,36 +176,50 @@ public class KeyValueContainer implements Container {
|
||||||
* Creates .container file and checksum file.
|
* Creates .container file and checksum file.
|
||||||
*
|
*
|
||||||
* @param containerFile
|
* @param containerFile
|
||||||
* @param containerCheckSumFile
|
* @param checksumFile
|
||||||
|
* @param isCreate true if we are creating a new container file and false if
|
||||||
|
* we are updating an existing container file.
|
||||||
* @throws StorageContainerException
|
* @throws StorageContainerException
|
||||||
*/
|
*/
|
||||||
private void createContainerFile(File containerFile, File
|
private void writeToContainerFile(File containerFile, File
|
||||||
containerCheckSumFile) throws StorageContainerException {
|
checksumFile, boolean isCreate)
|
||||||
|
throws StorageContainerException {
|
||||||
File tempContainerFile = null;
|
File tempContainerFile = null;
|
||||||
File tempCheckSumFile = null;
|
File tempChecksumFile = null;
|
||||||
FileOutputStream containerCheckSumStream = null;
|
FileOutputStream containerCheckSumStream = null;
|
||||||
Writer writer = null;
|
Writer writer = null;
|
||||||
long containerId = containerData.getContainerID();
|
long containerId = containerData.getContainerID();
|
||||||
try {
|
try {
|
||||||
tempContainerFile = createTempFile(containerFile);
|
tempContainerFile = createTempFile(containerFile);
|
||||||
tempCheckSumFile = createTempFile(containerCheckSumFile);
|
tempChecksumFile = createTempFile(checksumFile);
|
||||||
ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
|
ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
|
||||||
.KeyValueContainer, tempContainerFile, containerData);
|
.KeyValueContainer, tempContainerFile, containerData);
|
||||||
|
|
||||||
//Compute Checksum for container file
|
//Compute Checksum for container file
|
||||||
String checksum = KeyValueContainerUtil.computeCheckSum(containerId,
|
String checksum = KeyValueContainerUtil.computeCheckSum(containerId,
|
||||||
tempContainerFile);
|
tempContainerFile);
|
||||||
containerCheckSumStream = new FileOutputStream(tempCheckSumFile);
|
containerCheckSumStream = new FileOutputStream(tempChecksumFile);
|
||||||
writer = new OutputStreamWriter(containerCheckSumStream, "UTF-8");
|
writer = new OutputStreamWriter(containerCheckSumStream, "UTF-8");
|
||||||
writer.write(checksum);
|
writer.write(checksum);
|
||||||
writer.flush();
|
writer.flush();
|
||||||
|
|
||||||
|
if (isCreate) {
|
||||||
|
// When creating a new container, .container file should not exist
|
||||||
|
// already.
|
||||||
NativeIO.renameTo(tempContainerFile, containerFile);
|
NativeIO.renameTo(tempContainerFile, containerFile);
|
||||||
NativeIO.renameTo(tempCheckSumFile, containerCheckSumFile);
|
NativeIO.renameTo(tempChecksumFile, checksumFile);
|
||||||
|
} else {
|
||||||
|
// When updating a container, the .container file should exist. If
|
||||||
|
// not, the container is in an inconsistent state.
|
||||||
|
Files.move(tempContainerFile.toPath(), containerFile.toPath(),
|
||||||
|
StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
Files.move(tempChecksumFile.toPath(), checksumFile.toPath(),
|
||||||
|
StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
}
|
||||||
|
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new StorageContainerException("Error during creation of " +
|
throw new StorageContainerException("Error during creation of " +
|
||||||
"required files(.container, .chksm) for container. Container Name: "
|
"required files(.container, .chksm) for container. ContainerID: "
|
||||||
+ containerId, ex, CONTAINER_FILES_CREATE_ERROR);
|
+ containerId, ex, CONTAINER_FILES_CREATE_ERROR);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(containerCheckSumStream);
|
IOUtils.closeStream(containerCheckSumStream);
|
||||||
|
@ -216,8 +229,8 @@ public class KeyValueContainer implements Container {
|
||||||
tempContainerFile.getAbsolutePath());
|
tempContainerFile.getAbsolutePath());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tempCheckSumFile != null && tempCheckSumFile.exists()) {
|
if (tempChecksumFile != null && tempChecksumFile.exists()) {
|
||||||
if (!tempCheckSumFile.delete()) {
|
if (!tempChecksumFile.delete()) {
|
||||||
LOG.warn("Unable to delete container temporary checksum file: {}.",
|
LOG.warn("Unable to delete container temporary checksum file: {}.",
|
||||||
tempContainerFile.getAbsolutePath());
|
tempContainerFile.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
@ -236,69 +249,25 @@ public class KeyValueContainer implements Container {
|
||||||
|
|
||||||
|
|
||||||
private void updateContainerFile(File containerFile, File
|
private void updateContainerFile(File containerFile, File
|
||||||
containerCheckSumFile) throws StorageContainerException {
|
checksumFile) throws StorageContainerException {
|
||||||
|
|
||||||
File containerBkpFile = null;
|
|
||||||
File checkSumBkpFile = null;
|
|
||||||
long containerId = containerData.getContainerID();
|
long containerId = containerData.getContainerID();
|
||||||
|
|
||||||
|
if (containerFile.exists() && checksumFile.exists()) {
|
||||||
try {
|
try {
|
||||||
if (containerFile.exists() && containerCheckSumFile.exists()) {
|
writeToContainerFile(containerFile, checksumFile, false);
|
||||||
//Take backup of original files (.container and .chksm files)
|
} catch (IOException e) {
|
||||||
containerBkpFile = new File(containerFile + ".bkp");
|
//TODO : Container update failure is not handled currently. Might
|
||||||
checkSumBkpFile = new File(containerCheckSumFile + ".bkp");
|
// lead to loss of .container file. When Update container feature
|
||||||
NativeIO.renameTo(containerFile, containerBkpFile);
|
// support is added, this failure should also be handled.
|
||||||
NativeIO.renameTo(containerCheckSumFile, checkSumBkpFile);
|
throw new StorageContainerException("Container update failed. " +
|
||||||
createContainerFile(containerFile, containerCheckSumFile);
|
"ContainerID: " + containerId, CONTAINER_FILES_CREATE_ERROR);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
containerData.setState(ContainerProtos.ContainerLifeCycleState.INVALID);
|
|
||||||
throw new StorageContainerException("Container is an Inconsistent " +
|
throw new StorageContainerException("Container is an Inconsistent " +
|
||||||
"state, missing required files(.container, .chksm). ContainerID: " +
|
"state, missing required files(.container, .chksm). ContainerID: " +
|
||||||
containerId, INVALID_CONTAINER_STATE);
|
containerId, INVALID_CONTAINER_STATE);
|
||||||
}
|
}
|
||||||
} catch (StorageContainerException ex) {
|
|
||||||
throw ex;
|
|
||||||
} catch (IOException ex) {
|
|
||||||
// Restore from back up files.
|
|
||||||
try {
|
|
||||||
if (containerBkpFile != null && containerBkpFile
|
|
||||||
.exists() && containerFile.delete()) {
|
|
||||||
LOG.info("update failed for container Name: {}, restoring container" +
|
|
||||||
" file", containerId);
|
|
||||||
NativeIO.renameTo(containerBkpFile, containerFile);
|
|
||||||
}
|
|
||||||
if (checkSumBkpFile != null && checkSumBkpFile.exists() &&
|
|
||||||
containerCheckSumFile.delete()) {
|
|
||||||
LOG.info("update failed for container Name: {}, restoring checksum" +
|
|
||||||
" file", containerId);
|
|
||||||
NativeIO.renameTo(checkSumBkpFile, containerCheckSumFile);
|
|
||||||
}
|
|
||||||
throw new StorageContainerException("Error during updating of " +
|
|
||||||
"required files(.container, .chksm) for container. Container Name: "
|
|
||||||
+ containerId, ex, CONTAINER_FILES_CREATE_ERROR);
|
|
||||||
} catch (IOException e) {
|
|
||||||
containerData.setState(ContainerProtos.ContainerLifeCycleState.INVALID);
|
|
||||||
LOG.error("During restore failed for container Name: " +
|
|
||||||
containerId);
|
|
||||||
throw new StorageContainerException(
|
|
||||||
"Failed to restore container data from the backup. ID: "
|
|
||||||
+ containerId, CONTAINER_FILES_CREATE_ERROR);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (containerBkpFile != null && containerBkpFile
|
|
||||||
.exists()) {
|
|
||||||
if(!containerBkpFile.delete()) {
|
|
||||||
LOG.warn("Unable to delete container backup file: {}",
|
|
||||||
containerBkpFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (checkSumBkpFile != null && checkSumBkpFile.exists()) {
|
|
||||||
if(!checkSumBkpFile.delete()) {
|
|
||||||
LOG.warn("Unable to delete container checksum backup file: {}",
|
|
||||||
checkSumBkpFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -393,22 +362,21 @@ public class KeyValueContainer implements Container {
|
||||||
"Updating a closed container without force option is not allowed. " +
|
"Updating a closed container without force option is not allowed. " +
|
||||||
"ContainerID: " + containerId, UNSUPPORTED_REQUEST);
|
"ContainerID: " + containerId, UNSUPPORTED_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<String, String> oldMetadata = containerData.getMetadata();
|
||||||
try {
|
try {
|
||||||
|
writeLock();
|
||||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||||
containerData.addMetadata(entry.getKey(), entry.getValue());
|
containerData.addMetadata(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
|
||||||
throw new StorageContainerException("Container Metadata update error" +
|
|
||||||
". Container Name:" + containerId, ex, CONTAINER_METADATA_ERROR);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
writeLock();
|
|
||||||
String containerName = String.valueOf(containerId);
|
|
||||||
File containerFile = getContainerFile();
|
File containerFile = getContainerFile();
|
||||||
File containerCheckSumFile = getContainerCheckSumFile();
|
File containerCheckSumFile = getContainerCheckSumFile();
|
||||||
// update the new container data to .container File
|
// update the new container data to .container File
|
||||||
updateContainerFile(containerFile, containerCheckSumFile);
|
updateContainerFile(containerFile, containerCheckSumFile);
|
||||||
} catch (StorageContainerException ex) {
|
} catch (StorageContainerException ex) {
|
||||||
|
// TODO:
|
||||||
|
// On error, reset the metadata.
|
||||||
|
containerData.setMetadata(oldMetadata);
|
||||||
throw ex;
|
throw ex;
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
|
|
|
@ -109,6 +109,7 @@ public class ContainerReader implements Runnable {
|
||||||
for (File containerTopDir : containerTopDirs) {
|
for (File containerTopDir : containerTopDirs) {
|
||||||
if (containerTopDir.isDirectory()) {
|
if (containerTopDir.isDirectory()) {
|
||||||
File[] containerDirs = containerTopDir.listFiles();
|
File[] containerDirs = containerTopDir.listFiles();
|
||||||
|
if (containerDirs != null) {
|
||||||
for (File containerDir : containerDirs) {
|
for (File containerDir : containerDirs) {
|
||||||
File metadataPath = new File(containerDir + File.separator +
|
File metadataPath = new File(containerDir + File.separator +
|
||||||
OzoneConsts.CONTAINER_META_PATH);
|
OzoneConsts.CONTAINER_META_PATH);
|
||||||
|
@ -122,7 +123,8 @@ public class ContainerReader implements Runnable {
|
||||||
verifyContainerFile(containerName, containerFile,
|
verifyContainerFile(containerName, containerFile,
|
||||||
checksumFile);
|
checksumFile);
|
||||||
} else {
|
} else {
|
||||||
LOG.error("Missing container metadata files for Container: " +
|
LOG.error(
|
||||||
|
"Missing container metadata files for Container: " +
|
||||||
"{}", containerName);
|
"{}", containerName);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -135,6 +137,7 @@ public class ContainerReader implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyContainerFile(String containerName, File containerFile,
|
private void verifyContainerFile(String containerName, File containerFile,
|
||||||
File checksumFile) {
|
File checksumFile) {
|
||||||
|
|
Loading…
Reference in New Issue