mirror of https://github.com/apache/druid.git
Make the google storage extension friendlier to 429 and 5XX responses (#5750)
* Make the google extension friendlier to 429 responses * Lots of trouble for a little space * Add in better tests and fix formatting * Add 500 errors as well as some basic unit tests * Add IOException to test * Add some more stuff to killer test * Change error code in puller test * fix tests and make errors more generic handling
This commit is contained in:
parent
adbe22c05b
commit
0fd42af8d6
|
@ -132,7 +132,7 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF
|
|||
@Override
|
||||
protected Predicate<Throwable> getRetryCondition()
|
||||
{
|
||||
return GoogleUtils.GOOGLE_RETRY;
|
||||
return GoogleUtils::isRetryable;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,11 @@
|
|||
|
||||
package io.druid.storage.google;
|
||||
|
||||
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||
import com.google.api.client.http.HttpResponseException;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.MapUtils;
|
||||
import io.druid.java.util.common.RE;
|
||||
import io.druid.java.util.common.RetryUtils;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
|
@ -64,14 +66,28 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
|
|||
private void deleteIfPresent(String bucket, String path) throws IOException
|
||||
{
|
||||
try {
|
||||
storage.delete(bucket, path);
|
||||
RetryUtils.retry(
|
||||
(RetryUtils.Task<Void>) () -> {
|
||||
storage.delete(bucket, path);
|
||||
return null;
|
||||
},
|
||||
GoogleUtils::isRetryable,
|
||||
1,
|
||||
5
|
||||
);
|
||||
}
|
||||
catch (GoogleJsonResponseException e) {
|
||||
catch (HttpResponseException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
throw e;
|
||||
}
|
||||
LOG.debug("Already deleted: [%s] [%s]", bucket, path);
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
throw ioe;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RE(e, "Failed to delete [%s] [%s]", bucket, path);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -56,7 +56,7 @@ public class GoogleDataSegmentPuller implements URIDataPuller
|
|||
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
|
||||
byteSource,
|
||||
outDir,
|
||||
GoogleUtils.GOOGLE_RETRY,
|
||||
GoogleUtils::isRetryable,
|
||||
false
|
||||
);
|
||||
LOG.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath());
|
||||
|
@ -107,7 +107,7 @@ public class GoogleDataSegmentPuller implements URIDataPuller
|
|||
if (e == null) {
|
||||
return false;
|
||||
}
|
||||
if (GoogleUtils.GOOGLE_RETRY.apply(e)) {
|
||||
if (GoogleUtils.isRetryable(e)) {
|
||||
return true;
|
||||
}
|
||||
// Look all the way down the cause chain, just in case something wraps it deep.
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package io.druid.storage.google;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.http.InputStreamContent;
|
||||
import com.google.api.client.http.FileContent;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.CompressionUtils;
|
||||
import io.druid.java.util.common.RE;
|
||||
import io.druid.java.util.common.RetryUtils;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.segment.SegmentUtils;
|
||||
|
@ -35,7 +37,6 @@ import io.druid.segment.loading.DataSegmentPusher;
|
|||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
|
@ -97,13 +98,23 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
|
|||
throws IOException
|
||||
{
|
||||
LOG.info("Inserting [%s] to [%s]", file, path);
|
||||
|
||||
FileInputStream fileSteam = new FileInputStream(file);
|
||||
|
||||
InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam);
|
||||
mediaContent.setLength(file.length());
|
||||
|
||||
storage.insert(config.getBucket(), path, mediaContent);
|
||||
try {
|
||||
RetryUtils.retry(
|
||||
(RetryUtils.Task<Void>) () -> {
|
||||
storage.insert(config.getBucket(), path, new FileContent(contentType, file));
|
||||
return null;
|
||||
},
|
||||
GoogleUtils::isRetryable,
|
||||
1,
|
||||
5
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RE(e, "Failed to upload [%s] to [%s]", file, path);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,19 +19,12 @@
|
|||
|
||||
package io.druid.storage.google;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.api.client.http.HttpResponseException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class GoogleUtils
|
||||
{
|
||||
public static final Predicate<Throwable> GOOGLE_RETRY = new Predicate<Throwable>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Throwable e)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
public static String toFilename(String path)
|
||||
{
|
||||
String filename = path.substring(path.lastIndexOf("/") + 1); // characters after last '/'
|
||||
|
@ -43,4 +36,13 @@ public class GoogleUtils
|
|||
{
|
||||
return path.substring(0, path.lastIndexOf("/")) + "/index.zip";
|
||||
}
|
||||
|
||||
public static boolean isRetryable(Throwable t)
|
||||
{
|
||||
if (t instanceof HttpResponseException) {
|
||||
final HttpResponseException e = (HttpResponseException) t;
|
||||
return e.getStatusCode() == 429 || (e.getStatusCode() / 500 == 1);
|
||||
}
|
||||
return t instanceof IOException;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,15 @@
|
|||
|
||||
package io.druid.storage.google;
|
||||
|
||||
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||
import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting;
|
||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.java.util.common.Intervals;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -36,6 +40,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
|
|||
{
|
||||
private static final String bucket = "bucket";
|
||||
private static final String indexPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
|
||||
private static final String descriptorPath = indexPath.substring(0, indexPath.lastIndexOf("/")) + "/descriptor.json";
|
||||
|
||||
private static final DataSegment dataSegment = new DataSegment(
|
||||
"test",
|
||||
|
@ -44,7 +49,7 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
|
|||
ImmutableMap.<String, Object>of("bucket", bucket, "path", indexPath),
|
||||
null,
|
||||
null,
|
||||
new NoneShardSpec(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
1
|
||||
);
|
||||
|
@ -60,11 +65,9 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
|
|||
@Test
|
||||
public void killTest() throws SegmentLoadingException, IOException
|
||||
{
|
||||
final String descriptorPath = indexPath.substring(0, indexPath.lastIndexOf("/")) + "/descriptor.json";
|
||||
|
||||
storage.delete(bucket, indexPath);
|
||||
storage.delete(EasyMock.eq(bucket), EasyMock.eq(indexPath));
|
||||
expectLastCall();
|
||||
storage.delete(bucket, descriptorPath);
|
||||
storage.delete(EasyMock.eq(bucket), EasyMock.eq(descriptorPath));
|
||||
expectLastCall();
|
||||
|
||||
replayAll();
|
||||
|
@ -79,8 +82,35 @@ public class GoogleDataSegmentKillerTest extends EasyMockSupport
|
|||
@Test(expected = SegmentLoadingException.class)
|
||||
public void killWithErrorTest() throws SegmentLoadingException, IOException
|
||||
{
|
||||
storage.delete(bucket, indexPath);
|
||||
expectLastCall().andThrow(new IOException(""));
|
||||
final GoogleJsonResponseException exception = GoogleJsonResponseExceptionFactoryTesting.newMock(
|
||||
JacksonFactory.getDefaultInstance(),
|
||||
300,
|
||||
"test"
|
||||
);
|
||||
storage.delete(EasyMock.eq(bucket), EasyMock.eq(indexPath));
|
||||
expectLastCall().andThrow(exception);
|
||||
|
||||
replayAll();
|
||||
|
||||
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
|
||||
|
||||
killer.kill(dataSegment);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void killRetryWithErrorTest() throws SegmentLoadingException, IOException
|
||||
{
|
||||
final GoogleJsonResponseException exception = GoogleJsonResponseExceptionFactoryTesting.newMock(
|
||||
JacksonFactory.getDefaultInstance(),
|
||||
500,
|
||||
"test"
|
||||
);
|
||||
storage.delete(EasyMock.eq(bucket), EasyMock.eq(indexPath));
|
||||
expectLastCall().andThrow(exception).once().andVoid().once();
|
||||
storage.delete(EasyMock.eq(bucket), EasyMock.eq(descriptorPath));
|
||||
expectLastCall().andThrow(exception).once().andVoid().once();
|
||||
|
||||
replayAll();
|
||||
|
||||
|
|
|
@ -19,8 +19,12 @@
|
|||
|
||||
package io.druid.storage.google;
|
||||
|
||||
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||
import com.google.api.client.googleapis.testing.json.GoogleJsonResponseExceptionFactoryTesting;
|
||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -43,8 +47,12 @@ public class GoogleDataSegmentPullerTest extends EasyMockSupport
|
|||
final File outDir = Files.createTempDirectory("druid").toFile();
|
||||
try {
|
||||
GoogleStorage storage = createMock(GoogleStorage.class);
|
||||
|
||||
expect(storage.get(bucket, path)).andThrow(new IOException(""));
|
||||
final GoogleJsonResponseException exception = GoogleJsonResponseExceptionFactoryTesting.newMock(
|
||||
JacksonFactory.getDefaultInstance(),
|
||||
300,
|
||||
"test"
|
||||
);
|
||||
expect(storage.get(EasyMock.eq(bucket), EasyMock.eq(path))).andThrow(exception);
|
||||
|
||||
replayAll();
|
||||
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.storage.google;
|
||||
|
||||
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||
import com.google.api.client.http.HttpHeaders;
|
||||
import com.google.api.client.http.HttpResponseException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class GoogleUtilsTest
|
||||
{
|
||||
@Test
|
||||
public void test429()
|
||||
{
|
||||
Assert.assertTrue(
|
||||
GoogleUtils.isRetryable(
|
||||
new GoogleJsonResponseException(
|
||||
new HttpResponseException.Builder(429, "ignored", new HttpHeaders()),
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(
|
||||
GoogleUtils.isRetryable(
|
||||
new HttpResponseException.Builder(429, "ignored", new HttpHeaders()).build()
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(
|
||||
GoogleUtils.isRetryable(
|
||||
new HttpResponseException.Builder(500, "ignored", new HttpHeaders()).build()
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(
|
||||
GoogleUtils.isRetryable(
|
||||
new HttpResponseException.Builder(503, "ignored", new HttpHeaders()).build()
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(
|
||||
GoogleUtils.isRetryable(
|
||||
new HttpResponseException.Builder(599, "ignored", new HttpHeaders()).build()
|
||||
)
|
||||
);
|
||||
Assert.assertFalse(
|
||||
GoogleUtils.isRetryable(
|
||||
new GoogleJsonResponseException(
|
||||
new HttpResponseException.Builder(404, "ignored", new HttpHeaders()),
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
Assert.assertFalse(
|
||||
GoogleUtils.isRetryable(
|
||||
new HttpResponseException.Builder(404, "ignored", new HttpHeaders()).build()
|
||||
)
|
||||
);
|
||||
Assert.assertTrue(
|
||||
GoogleUtils.isRetryable(
|
||||
new IOException("generic io exception")
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue