mirror of https://github.com/apache/druid.git
Add azure kill test (#15833)
* Add kill test * Extra line * Don't need toString * Add back test * Remove newline * move kill verification into main test
This commit is contained in:
parent
57e12df352
commit
d703b2c709
|
@ -19,11 +19,16 @@
|
||||||
|
|
||||||
package org.apache.druid.testsEx.indexer;
|
package org.apache.druid.testsEx.indexer;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.testsEx.utils.AzureTestUtil;
|
import org.apache.druid.testsEx.utils.AzureTestUtil;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,4 +74,22 @@ public class AbstractAzureInputSourceParallelIndexTest extends AbstractCloudInpu
|
||||||
LOG.warn(e, "Unable to delete container in azure");
|
LOG.warn(e, "Unable to delete container in azure");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void validateAzureSegmentFilesDeleted(String path)
|
||||||
|
{
|
||||||
|
List<URI> segmentFiles = ImmutableList.of();
|
||||||
|
try {
|
||||||
|
segmentFiles = azure.listFiles(path);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOG.warn(e, "Failed to validate that azure segment files were deleted.");
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Some segment files were not deleted: " + segmentFiles,
|
||||||
|
segmentFiles.size(),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,8 +134,9 @@ public abstract class AbstractCloudInputSourceParallelIndexTest extends Abstract
|
||||||
* @param segmentAvailabilityConfirmationPair set lhs in the pair to true if you want to confirm that the task waited longer than 0ms for the task to complete.
|
* @param segmentAvailabilityConfirmationPair set lhs in the pair to true if you want to confirm that the task waited longer than 0ms for the task to complete.
|
||||||
* set rhs to true to verify that the segment is actually available.
|
* set rhs to true to verify that the segment is actually available.
|
||||||
* @param inputSourceType Input source type (eg : s3, gcs, azure)
|
* @param inputSourceType Input source type (eg : s3, gcs, azure)
|
||||||
|
* @return The datasource used to test.
|
||||||
*/
|
*/
|
||||||
void doTest(
|
String doTest(
|
||||||
Pair<String, List<?>> inputSource,
|
Pair<String, List<?>> inputSource,
|
||||||
Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair,
|
Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair,
|
||||||
String inputSourceType
|
String inputSourceType
|
||||||
|
@ -200,6 +201,7 @@ public abstract class AbstractCloudInputSourceParallelIndexTest extends Abstract
|
||||||
true,
|
true,
|
||||||
segmentAvailabilityConfirmationPair
|
segmentAvailabilityConfirmationPair
|
||||||
);
|
);
|
||||||
|
return indexDatasource;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,7 @@ public class ITAzureV2ParallelIndexTest extends AbstractAzureInputSourceParallel
|
||||||
@Parameters(method = "resources")
|
@Parameters(method = "resources")
|
||||||
public void testAzureIndexData(Pair<String, List<?>> azureInputSource) throws Exception
|
public void testAzureIndexData(Pair<String, List<?>> azureInputSource) throws Exception
|
||||||
{
|
{
|
||||||
doTest(azureInputSource, new Pair<>(false, false), "azureStorage");
|
String dataSource = doTest(azureInputSource, new Pair<>(false, false), "azureStorage");
|
||||||
|
AbstractAzureInputSourceParallelIndexTest.validateAzureSegmentFilesDeleted("segments" + "/" + dataSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,13 @@ import org.apache.druid.testing.utils.ITRetryUtil;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.security.InvalidKeyException;
|
import java.security.InvalidKeyException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class AzureTestUtil
|
public class AzureTestUtil
|
||||||
{
|
{
|
||||||
|
@ -130,4 +133,20 @@ public class AzureTestUtil
|
||||||
LOG.info("Uploading file " + DRUID_CLOUD_PATH + '/' + source.getName() + " in azure container " + AZURE_CONTAINER);
|
LOG.info("Uploading file " + DRUID_CLOUD_PATH + '/' + source.getName() + " in azure container " + AZURE_CONTAINER);
|
||||||
blob.upload(Files.newInputStream(source.toPath()), source.length());
|
blob.upload(Files.newInputStream(source.toPath()), source.length());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of files under a path to be used for verification of kill tasks.
|
||||||
|
*
|
||||||
|
* @param filePath path to look for files under
|
||||||
|
*/
|
||||||
|
public List<URI> listFiles(String filePath) throws URISyntaxException, StorageException
|
||||||
|
{
|
||||||
|
// Retrieve reference to a previously created container.
|
||||||
|
CloudBlobContainer container = azureStorageClient.getContainerReference(AZURE_CONTAINER);
|
||||||
|
List<URI> activeFiles = new ArrayList<>();
|
||||||
|
container.listBlobs(DRUID_CLOUD_PATH + '/' + filePath).iterator().forEachRemaining(
|
||||||
|
blob -> activeFiles.add(blob.getUri())
|
||||||
|
);
|
||||||
|
return activeFiles;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue