HDFS-12178. Ozone: OzoneClient: Handling SCM container creationFlag at client side. Contributed by Nandakumar.
This commit is contained in:
parent
507447c816
commit
72e3016224
|
@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.web.storage;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
|
||||||
import org.apache.hadoop.scm.XceiverClientManager;
|
import org.apache.hadoop.scm.XceiverClientManager;
|
||||||
|
@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maintaining a list of ChunkInputStream. Write based on offset.
|
* Maintaining a list of ChunkInputStream. Write based on offset.
|
||||||
|
@ -55,6 +58,10 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
private long totalSize;
|
private long totalSize;
|
||||||
private long byteOffset;
|
private long byteOffset;
|
||||||
|
|
||||||
|
//This has to be removed once HDFS-11888 is resolved.
|
||||||
|
//local cache which will have list of created container names.
|
||||||
|
private static Set<String> containersCreated = new HashSet<>();
|
||||||
|
|
||||||
public ChunkGroupOutputStream() {
|
public ChunkGroupOutputStream() {
|
||||||
this.streamEntries = new ArrayList<>();
|
this.streamEntries = new ArrayList<>();
|
||||||
this.currentStreamIndex = 0;
|
this.currentStreamIndex = 0;
|
||||||
|
@ -285,13 +292,27 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
xceiverClientManager.acquireClient(pipeline);
|
xceiverClientManager.acquireClient(pipeline);
|
||||||
// create container if needed
|
// create container if needed
|
||||||
// TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
|
// TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
|
||||||
// always true.
|
//The following change has to reverted once HDFS-11888 is fixed.
|
||||||
boolean shouldCreate = true;
|
if(!containersCreated.contains(containerName)) {
|
||||||
if (shouldCreate) {
|
synchronized (containerName.intern()) {
|
||||||
try {
|
//checking again, there is a chance that some other thread has
|
||||||
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
|
// created it.
|
||||||
} catch (StorageContainerException sce) {
|
if (!containersCreated.contains(containerName)) {
|
||||||
LOG.warn("Create container failed with {}", containerName, sce);
|
LOG.debug("Need to create container {}.", containerName);
|
||||||
|
try {
|
||||||
|
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
|
||||||
|
} catch (StorageContainerException ex) {
|
||||||
|
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
|
||||||
|
//container already exist.
|
||||||
|
LOG.debug("Container {} already exists.", containerName);
|
||||||
|
} else {
|
||||||
|
LOG.error("Container creation failed for {}.",
|
||||||
|
containerName, ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
containersCreated.add(containerName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue