diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/EventReceiverFirehoseTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/EventReceiverFirehoseTestClient.java index 9ab6636a723..761d9f55dba 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/EventReceiverFirehoseTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/EventReceiverFirehoseTestClient.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; @@ -41,9 +42,15 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ExecutionException; public class EventReceiverFirehoseTestClient { + private static final Logger LOG = new Logger(EventReceiverFirehoseTestClient.class); + + static final int NUM_RETRIES = 30; + static final long DELAY_FOR_RETRIES_MS = 10000; + private final String host; private final ObjectMapper jsonMapper; private final HttpClient httpClient; @@ -82,31 +89,45 @@ public class EventReceiverFirehoseTestClient * @return */ public int postEvents(Collection> events, ObjectMapper objectMapper, String mediaType) + throws InterruptedException { - try { - StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(getURL())) - .setContent(mediaType, objectMapper.writeValueAsBytes(events)), - StatusResponseHandler.getInstance() - ).get(); + int retryCount = 0; + while (true) { + try { + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(getURL())) + .setContent(mediaType, objectMapper.writeValueAsBytes(events)), + StatusResponseHandler.getInstance() + ).get(); - if (!response.getStatus().equals(HttpResponseStatus.OK)) { - throw new ISE( - "Error while posting events to url[%s] status[%s] content[%s]", - getURL(), - response.getStatus(), - response.getContent() + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while posting events to url[%s] status[%s] content[%s]", + getURL(), + response.getStatus(), + response.getContent() + ); + } + Map responseData = objectMapper.readValue( + response.getContent(), new TypeReference>() + { + } ); + return responseData.get("eventCount"); + } + // adding retries to flaky tests using channels + catch (ExecutionException e) { + if (retryCount > NUM_RETRIES) { + throw new RuntimeException(e); //giving up now + } else { + LOG.info(e, "received exception, sleeping and retrying"); + retryCount++; + Thread.sleep(DELAY_FOR_RETRIES_MS); + } + } + catch (Exception e) { + throw new RuntimeException(e); } - Map responseData = objectMapper.readValue( - response.getContent(), new TypeReference>() - { - } - ); - return responseData.get("eventCount"); - } - catch (Exception e) { - throw new RuntimeException(e); } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java index 5e011264c47..5e9ad4d384d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java @@ -39,6 +39,9 @@ public class HttpUtil private static final Logger LOG = new Logger(AbstractQueryResourceTestClient.class); private static final StatusResponseHandler RESPONSE_HANDLER = StatusResponseHandler.getInstance(); + static final int NUM_RETRIES = 30; + static final long DELAY_FOR_RETRIES_MS = 10000; + public static StatusResponseHolder makeRequest(HttpClient httpClient, HttpMethod method, String url, byte[] content) { return makeRequestWithExpectedStatus( @@ -78,13 +81,13 @@ public class HttpUtil response.getContent() ); // it can take time for the auth config to propagate, so we retry - if (retryCount > 10) { + if (retryCount > NUM_RETRIES) { throw new ISE(errMsg); } else { LOG.error(errMsg); LOG.error("retrying in 3000ms, retryCount: " + retryCount); retryCount++; - Thread.sleep(3000); + Thread.sleep(DELAY_FOR_RETRIES_MS); } } else { break; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index 2f3716df11c..2083215d8a7 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -69,6 +69,9 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes DateTime dtLast; // timestamp of last event DateTime dtGroupBy; // timestamp for expected response for groupBy query + static final int NUM_RETRIES = 60; + static final long DELAY_FOR_RETRIES_MS = 10000; + @Inject ServerDiscoveryFactory factory; @Inject @@ -106,11 +109,14 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes postEvents(); // wait for a while to let the events be ingested - ITRetryUtil.retryUntilTrue( + ITRetryUtil.retryUntil( () -> { final int countRows = queryHelper.countRows(fullDatasourceName, Intervals.ETERNITY.toString()); return countRows == getNumExpectedRowsIngested(); }, + true, + DELAY_FOR_RETRIES_MS, + NUM_RETRIES, "Waiting all events are ingested" ); @@ -152,8 +158,8 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes ITRetryUtil.retryUntil( () -> coordinator.areSegmentsLoaded(fullDatasourceName), true, - 10000, - 60, + DELAY_FOR_RETRIES_MS, + NUM_RETRIES, "Real-time generated segments loaded" );