HDDS-693. Support multi-chunk signatures in s3g PUT object endpoint. Contributed by Elek Marton.
This commit is contained in:
parent
74a5e683fe
commit
ebf8e1731d
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.s3;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Input stream implementation to read body with chunked signatures.
|
||||||
|
* <p>
|
||||||
|
* see: https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||||
|
*/
|
||||||
|
public class SignedChunksInputStream extends InputStream {
|
||||||
|
|
||||||
|
private Pattern signatureLinePattern =
|
||||||
|
Pattern.compile("([0-9A-Fa-f]+);chunk-signature=.*");
|
||||||
|
|
||||||
|
private InputStream originalStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Numer of following databits. If zero, the signature line should be parsed.
|
||||||
|
*/
|
||||||
|
private int remainingData = 0;
|
||||||
|
|
||||||
|
public SignedChunksInputStream(InputStream inputStream) {
|
||||||
|
originalStream = inputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
if (remainingData > 0) {
|
||||||
|
int curr = originalStream.read();
|
||||||
|
remainingData--;
|
||||||
|
if (remainingData == 0) {
|
||||||
|
//read the "\r\n" at the end of the data section
|
||||||
|
originalStream.read();
|
||||||
|
originalStream.read();
|
||||||
|
}
|
||||||
|
return curr;
|
||||||
|
} else {
|
||||||
|
remainingData = readHeader();
|
||||||
|
if (remainingData == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int readHeader() throws IOException {
|
||||||
|
int prev = -1;
|
||||||
|
int curr = 0;
|
||||||
|
StringBuilder buf = new StringBuilder();
|
||||||
|
|
||||||
|
//read everything until the next \r\n
|
||||||
|
while (!eol(prev, curr) && curr != -1) {
|
||||||
|
int next = originalStream.read();
|
||||||
|
if (next != -1) {
|
||||||
|
buf.append((char) next);
|
||||||
|
}
|
||||||
|
prev = curr;
|
||||||
|
curr = next;
|
||||||
|
}
|
||||||
|
String signatureLine = buf.toString().trim();
|
||||||
|
if (signatureLine.length() == 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
//parse the data length.
|
||||||
|
Matcher matcher = signatureLinePattern.matcher(signatureLine);
|
||||||
|
if (matcher.matches()) {
|
||||||
|
return Integer.parseInt(matcher.group(1), 16);
|
||||||
|
} else {
|
||||||
|
throw new IOException("Invalid signature line: " + signatureLine);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean eol(int prev, int curr) {
|
||||||
|
return prev == 13 && curr == 10;
|
||||||
|
}
|
||||||
|
}
|
|
@ -50,9 +50,11 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||||
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
|
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
|
import org.apache.hadoop.ozone.s3.SignedChunksInputStream;
|
||||||
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
|
||||||
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -67,6 +69,9 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ObjectEndpoint.class);
|
LoggerFactory.getLogger(ObjectEndpoint.class);
|
||||||
|
|
||||||
|
@Context
|
||||||
|
private HttpHeaders headers;
|
||||||
|
|
||||||
private List<String> customizableGetHeaders = new ArrayList<>();
|
private List<String> customizableGetHeaders = new ArrayList<>();
|
||||||
|
|
||||||
public ObjectEndpoint() {
|
public ObjectEndpoint() {
|
||||||
|
@ -86,7 +91,6 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
*/
|
*/
|
||||||
@PUT
|
@PUT
|
||||||
public Response put(
|
public Response put(
|
||||||
@Context HttpHeaders headers,
|
|
||||||
@PathParam("bucket") String bucketName,
|
@PathParam("bucket") String bucketName,
|
||||||
@PathParam("path") String keyPath,
|
@PathParam("path") String keyPath,
|
||||||
@DefaultValue("STAND_ALONE") @QueryParam("replicationType")
|
@DefaultValue("STAND_ALONE") @QueryParam("replicationType")
|
||||||
|
@ -106,6 +110,11 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
OzoneOutputStream output = bucket
|
OzoneOutputStream output = bucket
|
||||||
.createKey(keyPath, length, replicationType, replicationFactor);
|
.createKey(keyPath, length, replicationType, replicationFactor);
|
||||||
|
|
||||||
|
if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
|
||||||
|
.equals(headers.getHeaderString("x-amz-content-sha256"))) {
|
||||||
|
body = new SignedChunksInputStream(body);
|
||||||
|
}
|
||||||
|
|
||||||
IOUtils.copy(body, output);
|
IOUtils.copy(body, output);
|
||||||
output.close();
|
output.close();
|
||||||
|
|
||||||
|
@ -125,7 +134,6 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
*/
|
*/
|
||||||
@GET
|
@GET
|
||||||
public Response get(
|
public Response get(
|
||||||
@Context HttpHeaders headers,
|
|
||||||
@PathParam("bucket") String bucketName,
|
@PathParam("bucket") String bucketName,
|
||||||
@PathParam("path") String keyPath,
|
@PathParam("path") String keyPath,
|
||||||
InputStream body) throws IOException, OS3Exception {
|
InputStream body) throws IOException, OS3Exception {
|
||||||
|
@ -227,4 +235,8 @@ public class ObjectEndpoint extends EndpointBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setHeaders(HttpHeaders headers) {
|
||||||
|
this.headers = headers;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.s3;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test input stream parsing with signatures.
|
||||||
|
*/
|
||||||
|
public class TestSignedChunksInputStream {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void emptyfile() throws IOException {
|
||||||
|
InputStream is = fileContent("0;chunk-signature"
|
||||||
|
+
|
||||||
|
"=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40");
|
||||||
|
String result = IOUtils.toString(is, Charset.forName("UTF-8"));
|
||||||
|
Assert.assertEquals("", result);
|
||||||
|
|
||||||
|
is = fileContent("0;chunk-signature"
|
||||||
|
+
|
||||||
|
"=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40\r"
|
||||||
|
+ "\n");
|
||||||
|
result = IOUtils.toString(is, Charset.forName("UTF-8"));
|
||||||
|
Assert.assertEquals("", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void singlechunk() throws IOException {
|
||||||
|
InputStream is = fileContent("0A;chunk-signature"
|
||||||
|
+
|
||||||
|
"=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40\r"
|
||||||
|
+ "\n1234567890\r\n");
|
||||||
|
String result = IOUtils.toString(is, Charset.forName("UTF-8"));
|
||||||
|
Assert.assertEquals("1234567890", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void singlechunkwithoutend() throws IOException {
|
||||||
|
InputStream is = fileContent("0A;chunk-signature"
|
||||||
|
+
|
||||||
|
"=23abb2bd920ddeeaac78a63ed808bc59fa6e7d3ef0e356474b82cdc2f8c93c40\r"
|
||||||
|
+ "\n1234567890\r\n");
|
||||||
|
String result = IOUtils.toString(is, Charset.forName("UTF-8"));
|
||||||
|
Assert.assertEquals("1234567890", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void multichunks() throws IOException {
|
||||||
|
InputStream is = fileContent("0a;chunk-signature=signature\r\n"
|
||||||
|
+ "1234567890\r\n"
|
||||||
|
+ "05;chunk-signature=signature\r\n"
|
||||||
|
+ "abcde\r\n");
|
||||||
|
String result = IOUtils.toString(is, Charset.forName("UTF-8"));
|
||||||
|
Assert.assertEquals("1234567890abcde", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private InputStream fileContent(String content) {
|
||||||
|
return new SignedChunksInputStream(
|
||||||
|
new ByteArrayInputStream(content.getBytes()));
|
||||||
|
}
|
||||||
|
}
|
|
@ -62,11 +62,11 @@ public class TestObjectGet {
|
||||||
ObjectEndpoint rest = new ObjectEndpoint();
|
ObjectEndpoint rest = new ObjectEndpoint();
|
||||||
rest.setClient(client);
|
rest.setClient(client);
|
||||||
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
|
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
|
||||||
|
rest.setHeaders(headers);
|
||||||
ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes());
|
ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes());
|
||||||
|
|
||||||
//WHEN
|
//WHEN
|
||||||
rest.get(headers, "b1", "key1", body);
|
rest.get("b1", "key1", body);
|
||||||
|
|
||||||
//THEN
|
//THEN
|
||||||
OzoneInputStream ozoneInputStream =
|
OzoneInputStream ozoneInputStream =
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test put object.
|
* Test put object.
|
||||||
|
@ -70,11 +71,12 @@ public class TestPutObject {
|
||||||
//GIVEN
|
//GIVEN
|
||||||
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
|
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
|
||||||
ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes());
|
ByteArrayInputStream body = new ByteArrayInputStream(CONTENT.getBytes());
|
||||||
|
objectEndpoint.setHeaders(headers);
|
||||||
|
|
||||||
//WHEN
|
//WHEN
|
||||||
Response response = objectEndpoint.put(headers, bucketName, keyName,
|
Response response = objectEndpoint.put(bucketName, keyName,
|
||||||
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "32 * 1024 * 1024",
|
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "32 * 1024 * 1024",
|
||||||
CONTENT.length(), body);
|
CONTENT.length(), body);
|
||||||
|
|
||||||
//THEN
|
//THEN
|
||||||
String volumeName = clientStub.getObjectStore()
|
String volumeName = clientStub.getObjectStore()
|
||||||
|
@ -88,4 +90,39 @@ public class TestPutObject {
|
||||||
Assert.assertEquals(200, response.getStatus());
|
Assert.assertEquals(200, response.getStatus());
|
||||||
Assert.assertEquals(CONTENT, keyContent);
|
Assert.assertEquals(CONTENT, keyContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutObjectWithSignedChunks() throws IOException, OS3Exception {
|
||||||
|
//GIVEN
|
||||||
|
HttpHeaders headers = Mockito.mock(HttpHeaders.class);
|
||||||
|
objectEndpoint.setHeaders(headers);
|
||||||
|
|
||||||
|
String chunkedContent = "0a;chunk-signature=signature\r\n"
|
||||||
|
+ "1234567890\r\n"
|
||||||
|
+ "05;chunk-signature=signature\r\n"
|
||||||
|
+ "abcde\r\n";
|
||||||
|
|
||||||
|
when(headers.getHeaderString("x-amz-content-sha256"))
|
||||||
|
.thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
|
||||||
|
|
||||||
|
//WHEN
|
||||||
|
Response response = objectEndpoint.put(bucketName, keyName,
|
||||||
|
ReplicationType.STAND_ALONE,
|
||||||
|
ReplicationFactor.ONE,
|
||||||
|
"32 * 1024 * 1024",
|
||||||
|
chunkedContent.length(),
|
||||||
|
new ByteArrayInputStream(chunkedContent.getBytes()));
|
||||||
|
|
||||||
|
//THEN
|
||||||
|
String volumeName = clientStub.getObjectStore()
|
||||||
|
.getOzoneVolumeName(bucketName);
|
||||||
|
OzoneInputStream ozoneInputStream =
|
||||||
|
clientStub.getObjectStore().getVolume(volumeName).getBucket(bucketName)
|
||||||
|
.readKey(keyName);
|
||||||
|
String keyContent =
|
||||||
|
IOUtils.toString(ozoneInputStream, Charset.forName("UTF-8"));
|
||||||
|
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals("1234567890abcde", keyContent);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue