mirror of https://github.com/apache/druid.git
Fix bug while adding `Range` header in HttpEntity (#12215)
Changes: - Add `Range` header to the request before opening the connection - Use header `Content-Range` instead of `Accept-Ranges` as `Content-Range` is guaranteed to be populated if the server is returning a partial response
This commit is contained in:
parent
a3affe1471
commit
290130b1fa
|
@ -89,29 +89,36 @@ public class HttpEntity extends RetryingInputEntity
|
|||
String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
|
||||
urlConnection.setRequestProperty("Authorization", basicAuthString);
|
||||
}
|
||||
final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
|
||||
final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges);
|
||||
if (withRanges && offset > 0) {
|
||||
// Set header for range request.
|
||||
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
|
||||
// See https://tools.ietf.org/html/rfc7233#section-2.1
|
||||
urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", offset));
|
||||
// Set header for range request.
|
||||
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
|
||||
// See https://tools.ietf.org/html/rfc7233#section-2.1
|
||||
urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", offset));
|
||||
final String contentRange = urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE);
|
||||
final boolean withContentRange = contentRange != null && contentRange.startsWith("bytes ");
|
||||
if (withContentRange && offset > 0) {
|
||||
return urlConnection.getInputStream();
|
||||
} else {
|
||||
if (!withRanges && offset > 0) {
|
||||
if (!withContentRange && offset > 0) {
|
||||
LOG.warn(
|
||||
"Since the input source doesn't support range requests, the object input stream is opened from the start and "
|
||||
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
|
||||
+ " a lot."
|
||||
);
|
||||
}
|
||||
final InputStream in = urlConnection.getInputStream();
|
||||
final long skipped = in.skip(offset);
|
||||
if (skipped != offset) {
|
||||
throw new ISE("Requested to skip [%s] bytes, but actual number of bytes skipped is [%s]", offset, skipped);
|
||||
InputStream in = urlConnection.getInputStream();
|
||||
try {
|
||||
final long skipped = in.skip(offset);
|
||||
if (skipped != offset) {
|
||||
in.close();
|
||||
throw new ISE("Requested to skip [%s] bytes, but actual number of bytes skipped is [%s]", offset, skipped);
|
||||
} else {
|
||||
return in;
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
in.close();
|
||||
throw ex;
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.impl;
|
||||
|
||||
import com.google.common.net.HttpHeaders;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.mockito.AdditionalAnswers;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
||||
public class HttpEntityTest
|
||||
{
|
||||
private URI uri;
|
||||
private URL url;
|
||||
private URLConnection urlConnection;
|
||||
private InputStream inputStreamMock;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException
|
||||
{
|
||||
uri = Mockito.mock(URI.class);
|
||||
url = Mockito.mock(URL.class);
|
||||
urlConnection = Mockito.mock(URLConnection.class);
|
||||
inputStreamMock = Mockito.mock(InputStream.class);
|
||||
Mockito.when(uri.toURL()).thenReturn(url);
|
||||
Mockito.when(url.openConnection()).thenReturn(urlConnection);
|
||||
Mockito.when(urlConnection.getInputStream()).thenReturn(inputStreamMock);
|
||||
Mockito.when(inputStreamMock.skip(ArgumentMatchers.anyLong())).then(AdditionalAnswers.returnsFirstArg());
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testOpenInputStream() throws IOException, URISyntaxException
|
||||
{
|
||||
URI url = new URI("https://druid.apache.org/data/wikipedia.json.gz");
|
||||
final InputStream inputStream = HttpEntity.openInputStream(url, "", null, 0);
|
||||
final InputStream inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5);
|
||||
inputStream.skip(5);
|
||||
Assert.assertTrue(IOUtils.contentEquals(inputStream, inputStreamPartial));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithServerSupportingRanges() throws IOException
|
||||
{
|
||||
long offset = 15;
|
||||
String contentRange = StringUtils.format("bytes %d-%d/%d", offset, 1000, 1000);
|
||||
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(contentRange);
|
||||
HttpEntity.openInputStream(uri, "", null, offset);
|
||||
Mockito.verify(inputStreamMock, Mockito.times(0)).skip(offset);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithServerNotSupportingRanges() throws IOException
|
||||
{
|
||||
long offset = 15;
|
||||
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(null);
|
||||
HttpEntity.openInputStream(uri, "", null, offset);
|
||||
Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithServerNotSupportingBytesRanges() throws IOException
|
||||
{
|
||||
long offset = 15;
|
||||
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn("token 2-12/12");
|
||||
HttpEntity.openInputStream(uri, "", null, offset);
|
||||
Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue