HDFS-12636. Ozone: OzoneFileSystem: Implement seek functionality for rpc client. Contributed by Lokesh Jain.

This commit is contained in:
Mukul Kumar Singh 2018-02-11 22:58:22 +05:30 committed by Owen O'Malley
parent 377b31ffa1
commit 9272e1021d
14 changed files with 559 additions and 475 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.client.io; package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@ -27,18 +29,21 @@
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
* Maintaining a list of ChunkInputStream. Read based on offset. * Maintaining a list of ChunkInputStream. Read based on offset.
*/ */
public class ChunkGroupInputStream extends InputStream { public class ChunkGroupInputStream extends InputStream implements Seekable {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupInputStream.class); LoggerFactory.getLogger(ChunkGroupInputStream.class);
@ -46,7 +51,13 @@ public class ChunkGroupInputStream extends InputStream {
private static final int EOF = -1; private static final int EOF = -1;
private final ArrayList<ChunkInputStreamEntry> streamEntries; private final ArrayList<ChunkInputStreamEntry> streamEntries;
// streamOffset[i] stores the offset at which chunkInputStream i stores
// data in the key
private long[] streamOffset = null;
private int currentStreamIndex; private int currentStreamIndex;
private long length = 0;
private boolean closed = false;
private String key;
public ChunkGroupInputStream() { public ChunkGroupInputStream() {
streamEntries = new ArrayList<>(); streamEntries = new ArrayList<>();
@ -67,18 +78,20 @@ public long getRemainingOfIndex(int index) {
* Append another stream to the end of the list. * Append another stream to the end of the list.
* *
* @param stream the stream instance. * @param stream the stream instance.
* @param length the max number of bytes that should be written to this * @param streamLength the max number of bytes that should be written to this
* stream. * stream.
*/ */
public synchronized void addStream(InputStream stream, long length) { public synchronized void addStream(ChunkInputStream stream,
streamEntries.add(new ChunkInputStreamEntry(stream, length)); long streamLength) {
streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
} }
@Override @Override
public synchronized int read() throws IOException { public synchronized int read() throws IOException {
checkNotClosed();
if (streamEntries.size() <= currentStreamIndex) { if (streamEntries.size() <= currentStreamIndex) {
throw new IndexOutOfBoundsException(); return EOF;
} }
ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex); ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
int data = entry.read(); int data = entry.read();
@ -87,6 +100,7 @@ public synchronized int read() throws IOException {
@Override @Override
public synchronized int read(byte[] b, int off, int len) throws IOException { public synchronized int read(byte[] b, int off, int len) throws IOException {
checkNotClosed();
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -122,15 +136,82 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
return totalReadLen; return totalReadLen;
} }
private static class ChunkInputStreamEntry extends InputStream { @Override
public void seek(long pos) throws IOException {
checkNotClosed();
if (pos < 0 || pos >= length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
return;
}
throw new EOFException(
"EOF encountered at pos: " + pos + " for key: " + key);
}
Preconditions.assertTrue(currentStreamIndex >= 0);
if (currentStreamIndex >= streamEntries.size()) {
currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
} else if (pos < streamOffset[currentStreamIndex]) {
currentStreamIndex =
Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
} else if (pos >= streamOffset[currentStreamIndex] + streamEntries
.get(currentStreamIndex).length) {
currentStreamIndex = Arrays
.binarySearch(streamOffset, currentStreamIndex + 1,
streamEntries.size(), pos);
}
if (currentStreamIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the currentStreamIndex
// accordingly so that currentStreamIndex = insertionPoint - 1
currentStreamIndex = -currentStreamIndex - 2;
}
// seek to the proper offset in the ChunkInputStream
streamEntries.get(currentStreamIndex)
.seek(pos - streamOffset[currentStreamIndex]);
}
private final InputStream inputStream; @Override
public long getPos() throws IOException {
return length == 0 ? 0 :
streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex)
.getPos();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public int available() throws IOException {
checkNotClosed();
long remaining = length - getPos();
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}
@Override
public void close() throws IOException {
closed = true;
for (int i = 0; i < streamEntries.size(); i++) {
streamEntries.get(i).close();
}
}
/**
* Encapsulates ChunkInputStream.
*/
public static class ChunkInputStreamEntry extends InputStream
implements Seekable {
private final ChunkInputStream chunkInputStream;
private final long length; private final long length;
private long currentPosition; private long currentPosition;
public ChunkInputStreamEntry(ChunkInputStream chunkInputStream,
ChunkInputStreamEntry(InputStream chunkInputStream, long length) { long length) {
this.inputStream = chunkInputStream; this.chunkInputStream = chunkInputStream;
this.length = length; this.length = length;
this.currentPosition = 0; this.currentPosition = 0;
} }
@ -142,21 +223,36 @@ synchronized long getRemaining() {
@Override @Override
public synchronized int read(byte[] b, int off, int len) public synchronized int read(byte[] b, int off, int len)
throws IOException { throws IOException {
int readLen = inputStream.read(b, off, len); int readLen = chunkInputStream.read(b, off, len);
currentPosition += readLen; currentPosition += readLen;
return readLen; return readLen;
} }
@Override @Override
public synchronized int read() throws IOException { public synchronized int read() throws IOException {
int data = inputStream.read(); int data = chunkInputStream.read();
currentPosition += 1; currentPosition += 1;
return data; return data;
} }
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
inputStream.close(); chunkInputStream.close();
}
@Override
public void seek(long pos) throws IOException {
chunkInputStream.seek(pos);
}
@Override
public long getPos() throws IOException {
return chunkInputStream.getPos();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
} }
} }
@ -168,8 +264,12 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
long length = 0; long length = 0;
String containerKey; String containerKey;
ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
for (KsmKeyLocationInfo ksmKeyLocationInfo : groupInputStream.key = keyInfo.getKeyName();
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) { List<KsmKeyLocationInfo> keyLocationInfos =
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
groupInputStream.streamOffset = new long[keyLocationInfos.size()];
for (int i = 0; i < keyLocationInfos.size(); i++) {
KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i);
String containerName = ksmKeyLocationInfo.getContainerName(); String containerName = ksmKeyLocationInfo.getContainerName();
Pipeline pipeline = Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName); storageContainerLocationClient.getContainer(containerName);
@ -180,6 +280,7 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
try { try {
LOG.debug("get key accessing {} {}", LOG.debug("get key accessing {} {}",
xceiverClient.getPipeline().getContainerName(), containerKey); xceiverClient.getPipeline().getContainerName(), containerKey);
groupInputStream.streamOffset[i] = length;
ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
.containerKeyDataForRead( .containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey); xceiverClient.getPipeline().getContainerName(), containerKey);
@ -202,6 +303,19 @@ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
} }
} }
} }
groupInputStream.length = length;
return new LengthInputStream(groupInputStream, length); return new LengthInputStream(groupInputStream, length);
} }
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
}
}
} }

View File

@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
@ -72,6 +73,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final int chunkSize; private final int chunkSize;
private final String requestID; private final String requestID;
private boolean closed;
/** /**
* A constructor for testing purpose only. * A constructor for testing purpose only.
@ -86,6 +88,7 @@ public ChunkGroupOutputStream() {
xceiverClientManager = null; xceiverClientManager = null;
chunkSize = 0; chunkSize = 0;
requestID = null; requestID = null;
closed = false;
} }
/** /**
@ -196,6 +199,8 @@ public long getByteOffset() {
@Override @Override
public synchronized void write(int b) throws IOException { public synchronized void write(int b) throws IOException {
checkNotClosed();
if (streamEntries.size() <= currentStreamIndex) { if (streamEntries.size() <= currentStreamIndex) {
Preconditions.checkNotNull(ksmClient); Preconditions.checkNotNull(ksmClient);
// allocate a new block, if a exception happens, log an error and // allocate a new block, if a exception happens, log an error and
@ -230,6 +235,8 @@ public synchronized void write(int b) throws IOException {
@Override @Override
public synchronized void write(byte[] b, int off, int len) public synchronized void write(byte[] b, int off, int len)
throws IOException { throws IOException {
checkNotClosed();
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -286,6 +293,7 @@ private void allocateNewBlock(int index) throws IOException {
@Override @Override
public synchronized void flush() throws IOException { public synchronized void flush() throws IOException {
checkNotClosed();
for (int i = 0; i <= currentStreamIndex; i++) { for (int i = 0; i <= currentStreamIndex; i++) {
streamEntries.get(i).flush(); streamEntries.get(i).flush();
} }
@ -298,6 +306,10 @@ public synchronized void flush() throws IOException {
*/ */
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
for (ChunkOutputStreamEntry entry : streamEntries) { for (ChunkOutputStreamEntry entry : streamEntries) {
if (entry != null) { if (entry != null) {
entry.close(); entry.close();
@ -464,4 +476,17 @@ public void close() throws IOException {
} }
} }
} }
/**
* Verify that the output stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
.getKeyName());
}
}
} }

View File

@ -49,4 +49,12 @@ public synchronized void close() throws IOException {
inputStream.close(); inputStream.close();
} }
@Override
public int available() throws IOException {
return inputStream.available();
}
public InputStream getInputStream() {
return inputStream;
}
} }

View File

@ -57,4 +57,8 @@ public synchronized void close() throws IOException {
//commitKey can be done here, if needed. //commitKey can be done here, if needed.
outputStream.close(); outputStream.close();
} }
public OutputStream getOutputStream() {
return outputStream;
}
} }

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.scm.storage; package org.apache.hadoop.scm.storage;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List; import java.util.List;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
@ -38,7 +41,7 @@
* instances. This class encapsulates all state management for iterating * instances. This class encapsulates all state management for iterating
* through the sequence of chunks and the sequence of buffers within each chunk. * through the sequence of chunks and the sequence of buffers within each chunk.
*/ */
public class ChunkInputStream extends InputStream { public class ChunkInputStream extends InputStream implements Seekable {
private static final int EOF = -1; private static final int EOF = -1;
@ -47,9 +50,10 @@ public class ChunkInputStream extends InputStream {
private XceiverClientManager xceiverClientManager; private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient; private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks; private List<ChunkInfo> chunks;
private int chunkOffset; private int chunkIndex;
private long[] chunkOffset;
private List<ByteBuffer> buffers; private List<ByteBuffer> buffers;
private int bufferOffset; private int bufferIndex;
/** /**
* Creates a new ChunkInputStream. * Creates a new ChunkInputStream.
@ -67,9 +71,21 @@ public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
this.xceiverClientManager = xceiverClientManager; this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient; this.xceiverClient = xceiverClient;
this.chunks = chunks; this.chunks = chunks;
this.chunkOffset = 0; this.chunkIndex = -1;
// chunkOffset[i] stores offset at which chunk i stores data in
// ChunkInputStream
this.chunkOffset = new long[this.chunks.size()];
initializeChunkOffset();
this.buffers = null; this.buffers = null;
this.bufferOffset = 0; this.bufferIndex = 0;
}
private void initializeChunkOffset() {
int tempOffset = 0;
for (int i = 0; i < chunks.size(); i++) {
chunkOffset[i] = tempOffset;
tempOffset += chunks.get(i).getLen();
}
} }
@Override @Override
@ -77,7 +93,8 @@ public synchronized int read()
throws IOException { throws IOException {
checkOpen(); checkOpen();
int available = prepareRead(1); int available = prepareRead(1);
return available == EOF ? EOF : buffers.get(bufferOffset).get(); return available == EOF ? EOF :
Byte.toUnsignedInt(buffers.get(bufferIndex).get());
} }
@Override @Override
@ -106,7 +123,7 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
if (available == EOF) { if (available == EOF) {
return EOF; return EOF;
} }
buffers.get(bufferOffset).get(b, off, available); buffers.get(bufferIndex).get(b, off, available);
return available; return available;
} }
@ -144,20 +161,20 @@ private synchronized int prepareRead(int len) throws IOException {
return EOF; return EOF;
} else if (buffers == null) { } else if (buffers == null) {
// The first read triggers fetching the first chunk. // The first read triggers fetching the first chunk.
readChunkFromContainer(0); readChunkFromContainer();
} else if (!buffers.isEmpty() && } else if (!buffers.isEmpty() &&
buffers.get(bufferOffset).hasRemaining()) { buffers.get(bufferIndex).hasRemaining()) {
// Data is available from the current buffer. // Data is available from the current buffer.
ByteBuffer bb = buffers.get(bufferOffset); ByteBuffer bb = buffers.get(bufferIndex);
return len > bb.remaining() ? bb.remaining() : len; return len > bb.remaining() ? bb.remaining() : len;
} else if (!buffers.isEmpty() && } else if (!buffers.isEmpty() &&
!buffers.get(bufferOffset).hasRemaining() && !buffers.get(bufferIndex).hasRemaining() &&
bufferOffset < buffers.size() - 1) { bufferIndex < buffers.size() - 1) {
// There are additional buffers available. // There are additional buffers available.
++bufferOffset; ++bufferIndex;
} else if (chunkOffset < chunks.size() - 1) { } else if (chunkIndex < chunks.size() - 1) {
// There are additional chunks available. // There are additional chunks available.
readChunkFromContainer(chunkOffset + 1); readChunkFromContainer();
} else { } else {
// All available input has been consumed. // All available input has been consumed.
return EOF; return EOF;
@ -170,20 +187,75 @@ private synchronized int prepareRead(int len) throws IOException {
* successful, then the data of the read chunk is saved so that its bytes can * successful, then the data of the read chunk is saved so that its bytes can
* be returned from subsequent read calls. * be returned from subsequent read calls.
* *
* @param readChunkOffset offset in the chunk list of which chunk to read
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
private synchronized void readChunkFromContainer(int readChunkOffset) private synchronized void readChunkFromContainer() throws IOException {
throws IOException { // On every chunk read chunkIndex should be increased so as to read the
// next chunk
chunkIndex += 1;
final ReadChunkResponseProto readChunkResponse; final ReadChunkResponseProto readChunkResponse;
try { try {
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
chunks.get(readChunkOffset), key, traceID); chunks.get(chunkIndex), key, traceID);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Unexpected OzoneException: " + e.toString(), e); throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} }
chunkOffset = readChunkOffset;
ByteString byteString = readChunkResponse.getData(); ByteString byteString = readChunkResponse.getData();
buffers = byteString.asReadOnlyByteBufferList(); buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
}
@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || (chunks.size() == 0 && pos > 0)
|| pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
.getLen()) {
throw new EOFException(
"EOF encountered pos: " + pos + " container key: " + key);
}
if (chunkIndex == -1) {
chunkIndex = Arrays.binarySearch(chunkOffset, pos);
} else if (pos < chunkOffset[chunkIndex]) {
chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
} else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
.getLen()) {
chunkIndex =
Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
}
if (chunkIndex < 0) {
// Binary search returns -insertionPoint - 1 if element is not present
// in the array. insertionPoint is the point at which element would be
// inserted in the sorted array. We need to adjust the chunkIndex
// accordingly so that chunkIndex = insertionPoint - 1
chunkIndex = -chunkIndex -2;
}
// adjust chunkIndex so that readChunkFromContainer reads the correct chunk
chunkIndex -= 1;
readChunkFromContainer();
adjustBufferIndex(pos);
}
private void adjustBufferIndex(long pos) {
long tempOffest = chunkOffset[chunkIndex];
for (int i = 0; i < buffers.size(); i++) {
if (pos - tempOffest >= buffers.get(i).capacity()) {
tempOffest += buffers.get(i).capacity();
} else {
bufferIndex = i;
break;
}
}
buffers.get(bufferIndex).position((int) (pos - tempOffest));
}
@Override
public synchronized long getPos() throws IOException {
return chunkIndex == -1 ? 0 :
chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
} }
} }

View File

@ -206,6 +206,10 @@ public StorageContainerManager getStorageContainerManager() {
return this.scm; return this.scm;
} }
public OzoneConfiguration getConf() {
return conf;
}
@Override @Override
public KeySpaceManager getKeySpaceManager() { public KeySpaceManager getKeySpaceManager() {
return this.ksm; return this.ksm;

View File

@ -19,14 +19,15 @@
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -111,13 +112,44 @@ public void testErrorWriteGroupOutputStream() throws Exception {
@Test @Test
public void testReadGroupInputStream() throws Exception { public void testReadGroupInputStream() throws Exception {
try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
ArrayList<InputStream> inputStreams = new ArrayList<>(); ArrayList<ChunkInputStream> inputStreams = new ArrayList<>();
String dataString = RandomStringUtils.randomAscii(500); String dataString = RandomStringUtils.randomAscii(500);
byte[] buf = dataString.getBytes(); byte[] buf = dataString.getBytes();
int offset = 0; int offset = 0;
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100); int tempOffset = offset;
ChunkInputStream in =
new ChunkInputStream(null, null, null, new ArrayList<>(), null) {
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean seekToNewSource(long targetPos)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int read() throws IOException {
return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
};
inputStreams.add(in); inputStreams.add(in);
offset += 100; offset += 100;
groupInputStream.addStream(in, 100); groupInputStream.addStream(in, 100);
@ -134,13 +166,44 @@ public void testReadGroupInputStream() throws Exception {
@Test @Test
public void testErrorReadGroupInputStream() throws Exception { public void testErrorReadGroupInputStream() throws Exception {
try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
ArrayList<InputStream> inputStreams = new ArrayList<>(); ArrayList<ChunkInputStream> inputStreams = new ArrayList<>();
String dataString = RandomStringUtils.randomAscii(500); String dataString = RandomStringUtils.randomAscii(500);
byte[] buf = dataString.getBytes(); byte[] buf = dataString.getBytes();
int offset = 0; int offset = 0;
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100); int tempOffset = offset;
ChunkInputStream in =
new ChunkInputStream(null, null, null, new ArrayList<>(), null) {
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@Override
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean seekToNewSource(long targetPos)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int read() throws IOException {
return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
};
inputStreams.add(in); inputStreams.add(in);
offset += 100; offset += 100;
groupInputStream.addStream(in, 100); groupInputStream.addStream(in, 100);

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* The input stream for Ozone file system.
*
* TODO: Make inputStream generic for both rest and rpc clients
* This class is not thread safe.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class OzoneFSInputStream extends FSInputStream {
private final ChunkGroupInputStream inputStream;
public OzoneFSInputStream(InputStream inputStream) {
this.inputStream = (ChunkGroupInputStream)inputStream;
}
@Override
public int read() throws IOException {
return inputStream.read();
}
@Override
public synchronized void close() throws IOException {
inputStream.close();
}
@Override
public void seek(long pos) throws IOException {
inputStream.seek(pos);
}
@Override
public long getPos() throws IOException {
return inputStream.getPos();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public int available() throws IOException {
return inputStream.available();
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
/**
* The output stream for Ozone file system.
*
* TODO: Make outputStream generic for both rest and rpc clients
* This class is not thread safe.
*/
public class OzoneFSOutputStream extends OutputStream {
private final ChunkGroupOutputStream outputStream;
public OzoneFSOutputStream(OutputStream outputStream) {
this.outputStream = (ChunkGroupOutputStream)outputStream;
}
@Override
public void write(int b) throws IOException {
outputStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
}
@Override
public synchronized void flush() throws IOException {
outputStream.flush();
}
@Override
public synchronized void close() throws IOException {
outputStream.close();
}
}

View File

@ -18,16 +18,15 @@
package org.apache.hadoop.fs.ozone; package org.apache.hadoop.fs.ozone;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Iterator;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -35,12 +34,18 @@
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.ozone.web.client.OzoneKey; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.web.client.OzoneRestClient; import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.ReplicationFactor;
import org.apache.hadoop.ozone.client.ReplicationType;
import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,19 +55,16 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.web.client.OzoneVolume;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER; import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME; import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR; import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER; import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE; import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
/** /**
* The Ozone Filesystem implementation. * The Ozone Filesystem implementation.
@ -78,11 +80,15 @@ public class OzoneFileSystem extends FileSystem {
static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class); static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class);
/** The Ozone client for connecting to Ozone server. */ /** The Ozone client for connecting to Ozone server. */
private OzoneRestClient ozone; private OzoneClient ozoneClient;
private ObjectStore objectStore;
private OzoneVolume volume;
private OzoneBucket bucket; private OzoneBucket bucket;
private URI uri; private URI uri;
private String userName; private String userName;
private Path workingDir; private Path workingDir;
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
@Override @Override
public void initialize(URI name, Configuration conf) throws IOException { public void initialize(URI name, Configuration conf) throws IOException {
@ -115,23 +121,24 @@ public void initialize(URI name, Configuration conf) throws IOException {
.setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER .setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER
+ bucketStr + OZONE_URI_DELIMITER).build(); + bucketStr + OZONE_URI_DELIMITER).build();
LOG.trace("Ozone URI for ozfs initialization is " + uri); LOG.trace("Ozone URI for ozfs initialization is " + uri);
this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr); this.ozoneClient = OzoneClientFactory.getRpcClient(conf);
objectStore = ozoneClient.getObjectStore();
this.volume = objectStore.getVolume(volumeStr);
this.bucket = volume.getBucket(bucketStr);
this.replicationType = ReplicationType.valueOf(
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
this.replicationFactor = ReplicationFactor.valueOf(
conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT));
try { try {
this.userName = this.userName =
UserGroupInformation.getCurrentUser().getShortUserName(); UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) { } catch (IOException e) {
this.userName = OZONE_DEFAULT_USER; this.userName = OZONE_DEFAULT_USER;
} }
this.ozone.setUserAuth(userName);
OzoneVolume volume = ozone.getVolume(volumeStr);
this.bucket = volume.getBucket(bucketStr);
this.workingDir = new Path(OZONE_USER_DIR, this.userName) this.workingDir = new Path(OZONE_USER_DIR, this.userName)
.makeQualified(this.uri, this.workingDir); .makeQualified(this.uri, this.workingDir);
} catch (OzoneException oe) {
final String msg = "Ozone server exception when initializing file system";
LOG.error(msg, oe);
throw new IOException(msg, oe);
} catch (URISyntaxException ue) { } catch (URISyntaxException ue) {
final String msg = "Invalid Ozone endpoint " + name; final String msg = "Invalid Ozone endpoint " + name;
LOG.error(msg, ue); LOG.error(msg, ue);
@ -142,7 +149,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {
ozone.close(); ozoneClient.close();
} finally { } finally {
super.close(); super.close();
} }
@ -162,14 +169,13 @@ public String getScheme() {
public FSDataInputStream open(Path f, int bufferSize) throws IOException { public FSDataInputStream open(Path f, int bufferSize) throws IOException {
LOG.trace("open() path:{}", f); LOG.trace("open() path:{}", f);
final FileStatus fileStatus = getFileStatus(f); final FileStatus fileStatus = getFileStatus(f);
final String key = pathToKey(f);
if (fileStatus.isDirectory()) { if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open directory " + f + " to read"); throw new FileNotFoundException("Can't open directory " + f + " to read");
} }
return new FSDataInputStream( return new FSDataInputStream(
new OzoneInputStream(getConf(), uri, bucket, pathToKey(f), new OzoneFSInputStream(bucket.readKey(key).getInputStream()));
fileStatus.getLen(), bufferSize, statistics));
} }
@Override @Override
@ -206,11 +212,12 @@ public FSDataOutputStream create(Path f, FsPermission permission,
// does not exists and a new file can thus be created. // does not exists and a new file can thus be created.
} }
final OzoneOutputStream stream = OzoneOutputStream ozoneOutputStream =
new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics); bucket.createKey(key, 0, replicationType, replicationFactor);
// We pass null to FSDataOutputStream so it won't count writes that // We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file // are being buffered to a file
return new FSDataOutputStream(stream, null); return new FSDataOutputStream(
new OzoneFSOutputStream(ozoneOutputStream.getOutputStream()), null);
} }
@Override @Override
@ -245,7 +252,7 @@ private class RenameIterator extends OzoneListingIterator {
RenameIterator(Path srcPath, Path dstPath) RenameIterator(Path srcPath, Path dstPath)
throws IOException { throws IOException {
super(srcPath, true); super(srcPath);
srcKey = pathToKey(srcPath); srcKey = pathToKey(srcPath);
dstKey = pathToKey(dstPath); dstKey = pathToKey(dstPath);
LOG.trace("rename from:{} to:{}", srcKey, dstKey); LOG.trace("rename from:{} to:{}", srcKey, dstKey);
@ -253,30 +260,17 @@ private class RenameIterator extends OzoneListingIterator {
boolean processKey(String key) throws IOException { boolean processKey(String key) throws IOException {
String newKeyName = dstKey.concat(key.substring(srcKey.length())); String newKeyName = dstKey.concat(key.substring(srcKey.length()));
return rename(key, newKeyName); rename(key, newKeyName);
}
// TODO: currently rename work by copying the file, with changes in KSM,
// this operation can be made improved by renaming the keys in KSM directly.
private boolean rename(String src, String dst) throws IOException {
final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, getConf());
try {
LOG.trace("rename by copying file from:{} to:{}", src, dst);
bucket.getKey(src, tmpFile.toPath());
bucket.putKey(dst, tmpFile);
return true; return true;
} catch (OzoneException oe) {
String msg = String.format("Error when renaming key from:%s to:%s",
src, dst);
LOG.error(msg, oe);
throw new IOException(msg, oe);
} finally {
if (!tmpFile.delete()) {
LOG.warn("Can not delete tmpFile: " + tmpFile);
} }
// TODO: currently rename work by copying the streams, with changes in KSM,
// this operation can be improved by renaming the keys in KSM directly.
private void rename(String src, String dst) throws IOException {
try (OzoneInputStream inputStream = bucket.readKey(src);
OzoneOutputStream outputStream = bucket
.createKey(dst, 0, replicationType, replicationFactor)) {
IOUtils.copyBytes(inputStream, outputStream, getConf());
} }
} }
} }
@ -386,8 +380,13 @@ private class DeleteIterator extends OzoneListingIterator {
private boolean recursive; private boolean recursive;
DeleteIterator(Path f, boolean recursive) DeleteIterator(Path f, boolean recursive)
throws IOException { throws IOException {
super(f, recursive); super(f);
this.recursive = recursive; this.recursive = recursive;
if (getStatus().isDirectory()
&& !this.recursive
&& listStatus(f).length != 0) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
} }
boolean processKey(String key) throws IOException { boolean processKey(String key) throws IOException {
@ -421,7 +420,7 @@ private class ListStatusIterator extends OzoneListingIterator {
private Path f; private Path f;
ListStatusIterator(Path f) throws IOException { ListStatusIterator(Path f) throws IOException {
super(f, true); super(f);
this.f = f; this.f = f;
} }
@ -532,8 +531,7 @@ public FileStatus getFileStatus(Path f) throws IOException {
if (key.length() == 0) { if (key.length() == 0) {
return new FileStatus(0, true, 1, 0, return new FileStatus(0, true, 1, 0,
getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER), bucket.getCreationTime(), qualifiedPath);
qualifiedPath);
} }
// consider this a file and get key status // consider this a file and get key status
@ -548,14 +546,11 @@ public FileStatus getFileStatus(Path f) throws IOException {
throw new FileNotFoundException(f + ": No such file or directory!"); throw new FileNotFoundException(f + ": No such file or directory!");
} else if (isDirectory(meta)) { } else if (isDirectory(meta)) {
return new FileStatus(0, true, 1, 0, return new FileStatus(0, true, 1, 0,
getModifiedTime(meta.getObjectInfo().getModifiedOn(), key), meta.getModificationTime(), qualifiedPath);
qualifiedPath);
} else { } else {
//TODO: Fetch replication count from ratis config //TODO: Fetch replication count from ratis config
return new FileStatus(meta.getObjectInfo().getSize(), false, 1, return new FileStatus(meta.getDataSize(), false, 1,
getDefaultBlockSize(f), getDefaultBlockSize(f), meta.getModificationTime(), qualifiedPath);
getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
qualifiedPath);
} }
} }
@ -566,54 +561,23 @@ public FileStatus getFileStatus(Path f) throws IOException {
*/ */
private OzoneKey getKeyInfo(String key) { private OzoneKey getKeyInfo(String key) {
try { try {
return bucket.getKeyInfo(key); return bucket.getKey(key);
} catch (OzoneException e) { } catch (IOException e) {
LOG.trace("Key:{} does not exists", key); LOG.trace("Key:{} does not exists", key);
return null; return null;
} }
} }
/**
* Helper method to get the modified time of the key.
* @param key key to fetch the modified time
* @return last modified time of the key
*/
private long getModifiedTime(String modifiedTime, String key) {
try {
return OzoneUtils.formatDate(modifiedTime);
} catch (ParseException pe) {
LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
return 0;
}
}
/** /**
* Helper method to check if an Ozone key is representing a directory. * Helper method to check if an Ozone key is representing a directory.
* @param key key to be checked as a directory * @param key key to be checked as a directory
* @return true if key is a directory, false otherwise * @return true if key is a directory, false otherwise
*/ */
private boolean isDirectory(OzoneKey key) { private boolean isDirectory(OzoneKey key) {
LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(), LOG.trace("key name:{} size:{}", key.getName(),
key.getObjectInfo().getSize()); key.getDataSize());
return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER) return key.getName().endsWith(OZONE_URI_DELIMITER)
&& (key.getObjectInfo().getSize() == 0); && (key.getDataSize() == 0);
}
/**
* Helper method to list entries matching the key name in bucket.
* @param dirKey key prefix for listing the keys
* @param lastKey last iterated key
* @return List of Keys
*/
List<OzoneKey> listKeys(String dirKey, String lastKey)
throws IOException {
LOG.trace("list keys dirKey:{} lastKey:{}", dirKey, lastKey);
try {
return bucket.listKeys(dirKey, LISTING_PAGE_SIZE, lastKey);
} catch (OzoneException oe) {
LOG.error("list keys failed dirKey:{} lastKey:{}", dirKey, lastKey, oe);
throw new IOException("List keys failed " + oe.getMessage());
}
} }
/** /**
@ -623,11 +587,11 @@ List<OzoneKey> listKeys(String dirKey, String lastKey)
*/ */
private boolean createDirectory(String keyName) { private boolean createDirectory(String keyName) {
try { try {
LOG.trace("creating dir for key:{}", keyName); LOG.info("creating dir for key:{}", keyName);
bucket.putKey(keyName, ""); bucket.createKey(keyName, 0, replicationType, replicationFactor).close();
return true; return true;
} catch (OzoneException oe) { } catch (IOException ioe) {
LOG.error("create key failed for key:{}", keyName, oe); LOG.error("create key failed for key:{}", keyName, ioe);
return false; return false;
} }
} }
@ -642,8 +606,8 @@ private boolean deleteObject(String keyName) {
try { try {
bucket.deleteKey(keyName); bucket.deleteKey(keyName);
return true; return true;
} catch (OzoneException oe) { } catch (IOException ioe) {
LOG.error("delete key failed " + oe.getMessage()); LOG.error("delete key failed " + ioe.getMessage());
return false; return false;
} }
} }
@ -671,7 +635,7 @@ public String pathToKey(Path path) {
* @param key the ozone Key which needs to be appended * @param key the ozone Key which needs to be appended
* @return delimiter appended key * @return delimiter appended key
*/ */
String addTrailingSlashIfNeeded(String key) { private String addTrailingSlashIfNeeded(String key) {
if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) { if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
return key + OZONE_URI_DELIMITER; return key + OZONE_URI_DELIMITER;
} else { } else {
@ -690,47 +654,36 @@ public String toString() {
private abstract class OzoneListingIterator { private abstract class OzoneListingIterator {
private final Path path; private final Path path;
private final boolean recursive;
private final FileStatus status; private final FileStatus status;
private String pathKey; private String pathKey;
private Iterator<OzoneKey> keyIterator;
OzoneListingIterator(Path path, boolean recursive) OzoneListingIterator(Path path)
throws IOException { throws IOException {
this.path = path; this.path = path;
this.recursive = recursive;
this.status = getFileStatus(path); this.status = getFileStatus(path);
this.pathKey = pathToKey(path); this.pathKey = pathToKey(path);
if (status.isDirectory()) { if (status.isDirectory()) {
this.pathKey = addTrailingSlashIfNeeded(pathKey); this.pathKey = addTrailingSlashIfNeeded(pathKey);
} }
keyIterator = bucket.listKeys(pathKey);
} }
abstract boolean processKey(String key) throws IOException; abstract boolean processKey(String key) throws IOException;
// iterates all the keys in the particular path // iterates all the keys in the particular path
boolean iterate() throws IOException { boolean iterate() throws IOException {
LOG.trace("Iterating path {} - recursive {}", path, recursive); LOG.trace("Iterating path {}", path);
if (status.isDirectory()) { if (status.isDirectory()) {
LOG.trace("Iterating directory:{}", pathKey); LOG.trace("Iterating directory:{}", pathKey);
String lastKey = pathKey; while (keyIterator.hasNext()) {
while (true) { OzoneKey key = keyIterator.next();
List<OzoneKey> ozoneKeys = listKeys(pathKey, lastKey); LOG.info("iterating key:{}", key.getName());
LOG.trace("number of sub keys:{}", ozoneKeys.size()); if (!processKey(key.getName())) {
if (ozoneKeys.size() == 0) {
return processKey(pathKey);
} else {
if (!recursive) {
throw new PathIsNotEmptyDirectoryException(path.toString());
} else {
for (OzoneKey ozoneKey : ozoneKeys) {
lastKey = ozoneKey.getObjectInfo().getKeyName();
if (!processKey(lastKey)) {
return false; return false;
} }
} }
} return true;
}
}
} else { } else {
LOG.trace("iterating file:{}", path); LOG.trace("iterating file:{}", path);
return processKey(pathKey); return processKey(pathKey);
@ -744,5 +697,9 @@ String getPathKey() {
boolean pathIsDirectory() { boolean pathIsDirectory() {
return status.isDirectory(); return status.isDirectory();
} }
FileStatus getStatus() {
return status;
}
} }
} }

View File

@ -1,191 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.util.Objects;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
/**
* Wraps OzoneInputStream implementation.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class OzoneInputStream extends FSInputStream {
private static final Log LOG = LogFactory.getLog(OzoneInputStream.class);
private final RandomAccessFile in;
/** Closed bit. Volatile so reads are non-blocking. */
private volatile boolean closed = false;
/** the ozone bucket client. */
private final OzoneBucket bucket;
/** The object key. */
private final String key;
/** Object content length. */
private final long contentLen;
/** file system stats. */
private final Statistics stats;
private final URI keyUri;
OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
String key, long contentLen, int bufferSize, Statistics statistics)
throws IOException {
Objects.requireNonNull(bucket, "bucket can not be null!");
Objects.requireNonNull(key, "kenName can not be null!");
this.bucket = bucket;
this.key = key;
this.contentLen = contentLen;
this.stats = statistics;
this.keyUri = fsUri.resolve(key);
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
}
final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
try {
LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath());
bucket.getKey(this.key, tmpFile.toPath());
in = new RandomAccessFile(tmpFile, "r");
statistics.incrementReadOps(1);
} catch (OzoneException oe) {
final String msg = "Error when getBytes for key = " + key;
LOG.error(msg, oe);
throw new IOException(msg, oe);
}
}
@Override
public synchronized void seek(long targetPos) throws IOException {
checkNotClosed();
// Do not allow negative seek
if (targetPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos);
}
if (this.contentLen <= 0) {
return;
}
in.seek(targetPos);
}
@Override
public synchronized long getPos() throws IOException {
checkNotClosed();
return in.getFilePointer();
}
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized int read() throws IOException {
int ch = in.read();
if (stats != null && ch != -1) {
stats.incrementBytesRead(1);
}
return ch;
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
Preconditions.checkArgument(buffer != null, "buffer can not be null");
int numberOfByteRead = super.read(position, buffer, offset, length);
if (stats != null && numberOfByteRead > 0) {
stats.incrementBytesRead(numberOfByteRead);
}
return numberOfByteRead;
}
@Override
public synchronized int read(byte[] buffer, int offset, int length)
throws IOException {
Preconditions.checkArgument(buffer != null, "buffer can not be null");
int numberOfByteRead = in.read(buffer, offset, length);
if (stats != null && numberOfByteRead > 0) {
stats.incrementBytesRead(numberOfByteRead);
}
return numberOfByteRead;
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
final long remainingInWrapped = contentLen - in.getFilePointer();
return (remainingInWrapped < Integer.MAX_VALUE)
? (int)remainingInWrapped
: Integer.MAX_VALUE;
}
@Override
public synchronized void close() throws IOException {
in.close();
}
@Override
public synchronized long skip(long pos) throws IOException {
return in.skipBytes((int) pos);
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(this.keyUri + ": "
+ FSExceptionMessages.STREAM_IS_CLOSED);
}
}
}

View File

@ -1,113 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
/**
* The output stream for Ozone file system.
*
* Data will be buffered on local disk, then uploaded to Ozone in
* {@link #close()} method.
*
* This class is not thread safe.
*/
public class OzoneOutputStream extends OutputStream {
private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class);
private OzoneBucket bucket;
private final String key;
private final URI keyUri;
private Statistics statistics;
private LocalDirAllocator dirAlloc;
private boolean closed;
private File tmpFile;
private BufferedOutputStream backupStream;
OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
String key, Statistics statistics) throws IOException {
this.bucket = bucket;
this.key = key;
this.keyUri = fsUri.resolve(key);
this.statistics = statistics;
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
}
dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
closed = false;
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (backupStream != null) {
backupStream.close();
}
try {
LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri);
bucket.putKey(key, tmpFile);
statistics.incrementWriteOps(1);
} catch (OzoneException oe) {
final String msg = "Uploading error: file=" + tmpFile + ", key=" + key;
LOG.error(msg, oe);
throw new IOException(msg, oe);
} finally {
if (!tmpFile.delete()) {
LOG.warn("Can not delete tmpFile: " + tmpFile);
}
}
}
@Override
public synchronized void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void write(int b) throws IOException {
backupStream.write(b);
statistics.incrementBytesWritten(1);
}
}

View File

@ -114,7 +114,9 @@ public void testOzFsReadWrite() throws IOException {
} }
FileStatus status = fs.getFileStatus(path); FileStatus status = fs.getFileStatus(path);
Assert.assertTrue(status.getModificationTime() < currentTime); // The timestamp of the newly created file should always be greater than
// the time when the test was started
Assert.assertTrue(status.getModificationTime() > currentTime);
try (FSDataInputStream inputStream = fs.open(path)) { try (FSDataInputStream inputStream = fs.open(path)) {
byte[] buffer = new byte[stringLen]; byte[] buffer = new byte[stringLen];

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.junit.Assert; import org.junit.Assert;
import java.io.IOException; import java.io.IOException;
@ -76,6 +78,10 @@ public static void createCluster() throws IOException {
storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
} }
private void copyClusterConfigs(String configKey) {
getConf().set(configKey, cluster.getConf().get(configKey));
}
@Override @Override
public FileSystem getTestFileSystem() throws IOException { public FileSystem getTestFileSystem() throws IOException {
//assumes cluster is not null //assumes cluster is not null
@ -95,8 +101,6 @@ public FileSystem getTestFileSystem() throws IOException {
BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs); BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
try { try {
storageHandler.createVolume(volumeArgs); storageHandler.createVolume(volumeArgs);
storageHandler.createBucket(bucketArgs); storageHandler.createBucket(bucketArgs);
} catch (OzoneException e) { } catch (OzoneException e) {
throw new IOException(e.getMessage()); throw new IOException(e.getMessage());
@ -107,6 +111,8 @@ public FileSystem getTestFileSystem() throws IOException {
String uri = String.format("%s://localhost:%d/%s/%s", String uri = String.format("%s://localhost:%d/%s/%s",
Constants.OZONE_URI_SCHEME, port, volumeName, bucketName); Constants.OZONE_URI_SCHEME, port, volumeName, bucketName);
getConf().set("fs.defaultFS", uri); getConf().set("fs.defaultFS", uri);
copyClusterConfigs(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY);
copyClusterConfigs(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
return FileSystem.get(getConf()); return FileSystem.get(getConf());
} }