Use unique segment paths for Kafka indexing (#5692)

* support unique segment file paths

* forbiddenapis

* code review changes

* code review changes

* code review changes

* checkstyle fix
This commit is contained in:
David Lim 2018-04-29 22:59:48 -06:00 committed by Gian Merlino
parent 762f8829e4
commit 8ec2d2fe18
59 changed files with 966 additions and 536 deletions

View File

@ -20,8 +20,11 @@
package io.druid.segment.loading;
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Set;
/**
@ -31,6 +34,8 @@ import java.util.Set;
@ExtensionPoint
public interface DataSegmentFinder
{
Logger log = new Logger(DataSegmentFinder.class);
/**
* This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath
* workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder.
@ -46,4 +51,26 @@ public interface DataSegmentFinder
* @return a set of segments that were found underneath workingDirPath
*/
Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException;
/**
* Adds dataSegment if it does not exist in timestampedSegments. If it exists, replaces entry if segmentModifiedAt is
* newer than stored timestamp.
*
* @param timestampedSegments map of <segmentID, Pair<segment, modifiedAt>> containing segments with modified time
* @param dataSegment segment to add
* @param segmentModifiedAt segment modified timestamp
*/
static void putInMapRetainingNewest(
Map<String, Pair<DataSegment, Long>> timestampedSegments, DataSegment dataSegment, long segmentModifiedAt
)
{
timestampedSegments.merge(
dataSegment.getIdentifier(),
Pair.of(dataSegment, segmentModifiedAt),
(previous, current) -> {
log.warn("Multiple copies of segmentId [%s] found, using newest version", current.lhs.getIdentifier());
return previous.rhs > current.rhs ? previous : current;
}
);
}
}

View File

@ -20,16 +20,41 @@
package io.druid.segment.loading;
import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
import java.io.IOException;
/**
*/
@ExtensionPoint
public interface DataSegmentKiller
{
void kill(DataSegment segments) throws SegmentLoadingException;
void killAll() throws IOException;
Logger log = new Logger(DataSegmentKiller.class);
/**
* Removes segment files (index and metadata) from deep storage.
* @param segment the segment to kill
* @throws SegmentLoadingException if the segment could not be completely removed
*/
void kill(DataSegment segment) throws SegmentLoadingException;
/**
* A more stoic killer who doesn't throw a tantrum if things get messy. Use when killing segments for best-effort
* cleanup.
* @param segment the segment to kill
*/
default void killQuietly(DataSegment segment)
{
try {
kill(segment);
}
catch (Exception e) {
log.debug(e, "Failed to kill segment %s", segment);
}
}
/**
* Like a nuke. Use wisely. Used by the 'reset-cluster' command, and of the built-in deep storage implementations, it
* is only implemented by local and HDFS.
*/
void killAll() throws IOException;
}

View File

@ -30,6 +30,7 @@ import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ExtensionPoint
public interface DataSegmentPusher
@ -44,36 +45,48 @@ public interface DataSegmentPusher
* Pushes index files and segment descriptor to deep storage.
* @param file directory containing index files
* @param segment segment descriptor
* @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict.
* The behavior of the indexer determines whether this should be true or false. For example,
* since Tranquility does not guarantee that replica tasks will generate indexes with the same
* data, the first segment pushed should be favored since otherwise multiple historicals may
* load segments with the same identifier but different contents which is a bad situation. On
* the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can
* lose or repeat data if it fails to write a segment because it already exists and overwriting
* is not permitted. This situation can occur if a task fails after pushing to deep storage but
* before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161.
* @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica
* tasks can either overwrite or fail to overwrite existing segments leading to the possibility
* of different versions of the same segment ID containing different data. As an example, a Kafka
* indexing task starting at offset A and ending at offset B may push a segment to deep storage
* and then fail before writing the loadSpec to the metadata table, resulting in a replacement
* task being spawned. This replacement will also start at offset A but will read to offset C and
* will then push a segment to deep storage and write the loadSpec metadata. Without unique file
* paths, this can only work correctly if new segments overwrite existing segments. Suppose that
* at this point the task then fails so that the supervisor retries again from offset A. This 3rd
* attempt will overwrite the segments in deep storage before failing to write the loadSpec
* metadata, resulting in inconsistencies in the segment data now in deep storage and copies of
* the segment already loaded by historicals.
*
* If replaceExisting is true, existing objects MUST be overwritten, since failure to do so
* will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be
* prioritized but it is acceptable if they are overwritten (deep storages may be eventually
* consistent or otherwise unable to support transactional writes).
* If unique paths are used, caller is responsible for cleaning up segments that were pushed but
* were not written to the metadata table (for example when using replica tasks).
* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException;
DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException;
//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);
/**
* @deprecated backward-compatibiliy shim that should be removed on next major release;
* use {@link #getStorageDir(DataSegment, boolean)} instead.
*/
@Deprecated
default String getStorageDir(DataSegment dataSegment)
{
return getDefaultStorageDir(dataSegment);
return getStorageDir(dataSegment, false);
}
default String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{
return getDefaultStorageDir(dataSegment, useUniquePath);
}
default String makeIndexPathName(DataSegment dataSegment, String indexName)
{
return StringUtils.format("./%s/%s", getStorageDir(dataSegment), indexName);
// This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false
return StringUtils.format("./%s/%s", getStorageDir(dataSegment, false), indexName);
}
/**
@ -89,13 +102,19 @@ public interface DataSegmentPusher
// If above format is ever changed, make sure to change it appropriately in other places
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment)
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
segment.getShardSpec().getPartitionNum(),
useUniquePath ? generateUniquePath() : null
);
}
static String generateUniquePath()
{
return UUID.randomUUID().toString();
}
}

View File

@ -5,24 +5,43 @@ layout: doc_page
`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used
to update the segment table in metadata storage after people manually migrate segments from one place to another.
It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the
It can also be used to insert missing segments into Druid, or even recover metadata storage by telling it where the
segments are stored.
Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
**Note:** This tool simply scans the deep storage directory to reconstruct the metadata entries used to locate and
identify each segment. It does not have any understanding about whether those segments _should actually_ be written to
the metadata storage. In certain cases, this can lead to undesired or inconsistent results. Some examples of things to
watch out for:
- Dropped datasources will be re-enabled.
- The latest version of each segment set will be loaded by Druid, which in some cases may not be the version you
actually want. An example of this is a bad compaction job that generates segments which need to be manually rolled
back by removing that version from the metadata table. If these segments are not also removed from deep storage,
they will be imported back into the metadata table and overshadow the correct version.
- Some indexers such as the Kafka indexing service have the potential to generate more than one set of segments that
have the same segment ID but different contents. When the metadata is first written, the correct set of segments is
referenced and the other set is normally deleted from deep storage. It is possible however that an unhandled
exception could result in multiple sets of segments with the same segment ID remaining in deep storage. Since this
tool does not know which one is the 'correct' one to use, it will simply select the newest segment set and ignore
the other versions. If the wrong segment set is picked, the exactly-once semantics of the Kafka indexing service
will no longer hold true and you may get duplicated or dropped events.
With these considerations in mind, it is recommended that data migrations be done by exporting the original metadata
storage directly, since that is the definitive cluster state. This tool should be used as a last resort when a direct
export is not possible.
**Note:** This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
with the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument
or runtime.properties file. Specifically, this tool needs to know
or runtime.properties file. Specifically, this tool needs to know:
`druid.metadata.storage.type`
`druid.metadata.storage.connector.connectURI`
`druid.metadata.storage.connector.user`
`druid.metadata.storage.connector.password`
`druid.storage.type`
```
druid.metadata.storage.type
druid.metadata.storage.connector.connectURI
druid.metadata.storage.connector.user
druid.metadata.storage.connector.password
druid.storage.type
```
Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
update descriptor.json (`partitionNum_descriptor.json` for HDFS data storage). These two can be provided through command line arguments.

View File

@ -85,7 +85,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
}
@Override
public String getStorageDir(DataSegment dataSegment)
public String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{
String seg = JOINER.join(
dataSegment.getDataSource(),
@ -96,7 +96,8 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
dataSegment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
dataSegment.getVersion().replace(":", "_"),
dataSegment.getShardSpec().getPartitionNum()
dataSegment.getShardSpec().getPartitionNum(),
useUniquePath ? DataSegmentPusher.generateUniquePath() : null
);
log.info("DataSegment: [%s]", seg);
@ -122,9 +123,9 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
return descriptorFile;
}
public Map<String, String> getAzurePaths(final DataSegment segment)
public Map<String, String> getAzurePaths(final DataSegment segment, final boolean useUniquePath)
{
final String storageDir = this.getStorageDir(segment);
final String storageDir = this.getStorageDir(segment, useUniquePath);
return ImmutableMap.of(
"index", StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
@ -139,13 +140,12 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final long size,
final File compressedSegmentData,
final File descriptorFile,
final Map<String, String> azurePaths,
final boolean replaceExisting
final Map<String, String> azurePaths
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"), replaceExisting);
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"), replaceExisting);
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"));
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
final DataSegment outSegment = segment
.withSize(size)
@ -162,7 +162,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean useUniquePath)
throws IOException
{
log.info("Uploading [%s] to Azure.", indexFilesDir);
@ -176,10 +176,10 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);
final File descFile = descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
final Map<String, String> azurePaths = getAzurePaths(segment);
final Map<String, String> azurePaths = getAzurePaths(segment, useUniquePath);
return AzureUtils.retryAzureOperation(
() -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths, replaceExisting),
() -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths),
config.getMaxTries()
);
}

View File

@ -23,7 +23,6 @@ import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import io.druid.java.util.common.logger.Logger;
@ -77,27 +76,14 @@ public class AzureStorage
}
return deletedFiles;
}
public void uploadBlob(
final File file,
final String containerName,
final String blobPath,
final boolean replaceExisting
)
public void uploadBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException
{
CloudBlobContainer container = getCloudBlobContainer(containerName);
try (FileInputStream stream = new FileInputStream(file)) {
CloudBlockBlob blob = container.getBlockBlobReference(blobPath);
if (!replaceExisting && blob.exists()) {
log.info("Skipping push because blob [%s] exists && replaceExisting == false", blobPath);
} else {
blob.upload(stream, file.length());
}
container.getBlockBlobReference(blobPath).upload(stream, file.length());
}
}

View File

@ -70,7 +70,7 @@ public class AzureTaskLogs implements TaskLogs
try {
AzureUtils.retryAzureOperation(
() -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true);
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
return null;
},
config.getMaxTries()

View File

@ -82,6 +82,17 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Test
public void testPush() throws Exception
{
testPushInternal(false, "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip");
}
@Test
public void testPushUseUniquePath() throws Exception
{
testPushInternal(true, "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip");
}
private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
@ -104,7 +115,12 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath);
Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(),
segment.getLoadSpec().get("blobPath").toString().matches(matcher)
);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
}
@ -114,10 +130,13 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
final String storageDir = pusher.getStorageDir(dataSegment);
Map<String, String> paths = pusher.getAzurePaths(dataSegment);
final String storageDir = pusher.getStorageDir(dataSegment, false);
Map<String, String> paths = pusher.getAzurePaths(dataSegment, false);
assertEquals(StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index"));
assertEquals(
StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
paths.get("index")
);
assertEquals(
StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME),
paths.get("descriptor")
@ -131,11 +150,11 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
final int binaryVersion = 9;
final File compressedSegmentData = new File("index.zip");
final File descriptorFile = new File("descriptor.json");
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment, false);
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"), true);
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
expectLastCall();
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"), true);
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
expectLastCall();
replayAll();
@ -146,8 +165,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
0, // empty file
compressedSegmentData,
descriptorFile,
azurePaths,
true
azurePaths
);
assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize());
@ -172,7 +190,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
DataSegment withColons = dataSegment.withVersion("2018-01-05T14:54:09.295Z");
String segmentPath = pusher.getStorageDir(withColons);
String segmentPath = pusher.getStorageDir(withColons, false);
Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":"));
}
}

View File

@ -65,7 +65,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log", true);
azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log");
expectLastCall();
replayAll();

View File

@ -75,13 +75,12 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean replaceExisting)
throws IOException
public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean useUniquePath) throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
this.getStorageDir(segment)
this.getStorageDir(segment, useUniquePath)
);
// Create index
@ -92,20 +91,16 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
int version = SegmentUtils.getVersionFromDir(indexFilesDir);
try {
if (!replaceExisting && doesObjectExist(indexStorage, key)) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", key);
} else {
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
}
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
}
catch (Exception e) {
throw new IOException(e);

View File

@ -72,9 +72,12 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting)
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath)
{
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment));
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(
this.config.getBasePath(),
getStorageDir(inSegment, useUniquePath)
);
File descriptorFile = null;
File zipOutFile = null;
@ -93,22 +96,18 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
objectApi.getContainer()
);
if (!replaceExisting && objectApi.exists(segmentData.getPath())) {
log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath());
} else {
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
}
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
final DataSegment outSegment = inSegment
.withSize(indexSize)

View File

@ -84,7 +84,7 @@ public class CloudFilesDataSegmentPusherTest
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, false);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());

View File

@ -93,7 +93,7 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
return descriptorFile;
}
public void insert(final File file, final String contentType, final String path, final boolean replaceExisting)
public void insert(final File file, final String contentType, final String path)
throws IOException
{
LOG.info("Inserting [%s] to [%s]", file, path);
@ -103,15 +103,11 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam);
mediaContent.setLength(file.length());
if (!replaceExisting && storage.exists(config.getBucket(), path)) {
LOG.info("Skipping push because path [%s] exists && replaceExisting == false", path);
} else {
storage.insert(config.getBucket(), path, mediaContent);
}
storage.insert(config.getBucket(), path, mediaContent);
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean useUniquePath)
throws IOException
{
LOG.info("Uploading [%s] to Google.", indexFilesDir);
@ -123,7 +119,7 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
try {
indexFile = File.createTempFile("index", ".zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile);
final String storageDir = this.getStorageDir(segment);
final String storageDir = this.getStorageDir(segment, useUniquePath);
final String indexPath = buildPath(storageDir + "/" + "index.zip");
final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json");
@ -134,8 +130,8 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
descriptorFile = createDescriptorFile(jsonMapper, outSegment);
insert(indexFile, "application/zip", indexPath, replaceExisting);
insert(descriptorFile, "application/json", descriptorPath, replaceExisting);
insert(indexFile, "application/zip", indexPath);
insert(descriptorFile, "application/json", descriptorPath);
return outSegment;
}

View File

@ -78,7 +78,7 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
"foo",
Intervals.of("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Maps.newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
@ -92,30 +92,28 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
storage,
googleAccountConfig,
jsonMapper
).addMockedMethod("insert", File.class, String.class, String.class, boolean.class).createMock();
).addMockedMethod("insert", File.class, String.class, String.class).createMock();
final String storageDir = pusher.getStorageDir(segmentToPush);
final String storageDir = pusher.getStorageDir(segmentToPush, false);
final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/zip"),
EasyMock.eq(indexPath),
EasyMock.eq(true)
EasyMock.eq(indexPath)
);
expectLastCall();
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/json"),
EasyMock.eq(descriptorPath),
EasyMock.eq(true)
EasyMock.eq(descriptorPath)
);
expectLastCall();
replayAll();
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, false);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);

View File

@ -20,8 +20,9 @@
package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
@ -34,14 +35,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
public class HdfsDataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(HdfsDataSegmentFinder.class);
private final Configuration config;
@ -58,7 +60,7 @@ public class HdfsDataSegmentFinder implements DataSegmentFinder
public Set<DataSegment> findSegments(String workingDirPathStr, boolean updateDescriptor)
throws SegmentLoadingException
{
final Set<DataSegment> segments = Sets.newHashSet();
final Map<String, Pair<DataSegment, Long>> timestampedSegments = new HashMap<>();
final Path workingDirPath = new Path(workingDirPathStr);
FileSystem fs;
try {
@ -80,15 +82,31 @@ public class HdfsDataSegmentFinder implements DataSegmentFinder
final LocatedFileStatus locatedFileStatus = it.next();
final Path path = locatedFileStatus.getPath();
if (path.getName().endsWith("descriptor.json")) {
final Path indexZip;
// There are 3 supported path formats:
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/descriptor.json
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_descriptor.json
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_UUID_descriptor.json
final String descriptorParts[] = path.getName().split("_");
if (descriptorParts.length == 2
&& descriptorParts[1].equals("descriptor.json")
&& org.apache.commons.lang.StringUtils.isNumeric(descriptorParts[0])) {
indexZip = new Path(path.getParent(), StringUtils.format("%s_index.zip", descriptorParts[0]));
} else {
indexZip = new Path(path.getParent(), "index.zip");
Path indexZip = new Path(path.getParent(), "index.zip");
if (descriptorParts.length > 1) {
Preconditions.checkState(descriptorParts.length <= 3 &&
org.apache.commons.lang.StringUtils.isNumeric(descriptorParts[0]) &&
"descriptor.json".equals(descriptorParts[descriptorParts.length - 1]),
"Unexpected descriptor filename format [%s]", path
);
indexZip = new Path(
path.getParent(),
StringUtils.format(
"%s_%sindex.zip",
descriptorParts[0],
descriptorParts.length == 2 ? "" : descriptorParts[1] + "_"
)
);
}
if (fs.exists(indexZip)) {
final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);
@ -105,7 +123,12 @@ public class HdfsDataSegmentFinder implements DataSegmentFinder
mapper.writeValue(fs.create(path, true), dataSegment);
}
}
segments.add(dataSegment);
DataSegmentFinder.putInMapRetainingNewest(
timestampedSegments,
dataSegment,
locatedFileStatus.getModificationTime()
);
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descripter.json exists!?",
@ -119,7 +142,6 @@ public class HdfsDataSegmentFinder implements DataSegmentFinder
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", workingDirPath);
}
return segments;
return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet());
}
}

View File

@ -19,6 +19,7 @@
package io.druid.storage.hdfs;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.segment.loading.DataSegmentKiller;
@ -57,65 +58,53 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
public void kill(DataSegment segment) throws SegmentLoadingException
{
final Path segmentPath = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), segmentPath);
log.info("Killing segment[%s] mapped to path[%s]", segment.getIdentifier(), segmentPath);
try {
String segmentLocation = segmentPath.getName();
String filename = segmentPath.getName();
final FileSystem fs = segmentPath.getFileSystem(config);
if (!segmentLocation.endsWith(".zip")) {
if (!filename.endsWith(".zip")) {
throw new SegmentLoadingException("Unknown file type[%s]", segmentPath);
} else {
if (!fs.exists(segmentPath)) {
log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", segmentPath);
log.warn("Segment path [%s] does not exist", segmentPath);
return;
}
String[] zipParts = segmentLocation.split("_");
// for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
if (zipParts.length == 2
&& zipParts[1].equals("index.zip")
&& StringUtils.isNumeric(zipParts[0])) {
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
segmentPath.toString()
);
}
Path descriptorPath = new Path(
segmentPath.getParent(),
io.druid.java.util.common.StringUtils.format("%s_descriptor.json", zipParts[0])
);
//delete partitionNumber_descriptor.json
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
descriptorPath.toString()
);
}
//for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
// max depth to look is 2, i.e version directory and interval.
mayBeDeleteParentsUpto(fs, segmentPath, 2);
// There are 3 supported path formats:
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/index.zip
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_UUID_index.zip
String[] zipParts = filename.split("_");
} else { //for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/
// index.zip
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
segmentPath.toString()
);
}
Path descriptorPath = new Path(segmentPath.getParent(), "descriptor.json");
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException(
"Unable to kill segment, failed to delete [%s]",
descriptorPath.toString()
);
}
//for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/index.zip
//max depth to look is 3, i.e partition number directory,version directory and interval.
mayBeDeleteParentsUpto(fs, segmentPath, 3);
Path descriptorPath = new Path(segmentPath.getParent(), "descriptor.json");
if (zipParts.length > 1) {
Preconditions.checkState(zipParts.length <= 3 &&
StringUtils.isNumeric(zipParts[0]) &&
"index.zip".equals(zipParts[zipParts.length - 1]),
"Unexpected segmentPath format [%s]", segmentPath
);
descriptorPath = new Path(
segmentPath.getParent(),
io.druid.java.util.common.StringUtils.format(
"%s_%sdescriptor.json",
zipParts[0],
zipParts.length == 2 ? "" : zipParts[1] + "_"
)
);
}
if (!fs.delete(segmentPath, false)) {
throw new SegmentLoadingException("Unable to kill segment, failed to delete [%s]", segmentPath.toString());
}
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException("Unable to kill segment, failed to delete [%s]", descriptorPath.toString());
}
removeEmptyParentDirectories(fs, segmentPath, zipParts.length > 1 ? 2 : 3);
}
}
catch (IOException e) {
@ -131,11 +120,11 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
fs.delete(storageDirectory, true);
}
private void mayBeDeleteParentsUpto(final FileSystem fs, final Path segmentPath, final int maxDepthTobeDeleted)
private void removeEmptyParentDirectories(final FileSystem fs, final Path segmentPath, final int depth)
{
Path path = segmentPath;
try {
for (int i = 1; i <= maxDepthTobeDeleted; i++) {
for (int i = 1; i <= depth; i++) {
path = path.getParent();
if (fs.listStatus(path).length != 0 || !fs.delete(path, false)) {
break;

View File

@ -20,6 +20,7 @@
package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
@ -61,11 +62,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private final Supplier<String> fullyQualifiedStorageDirectory;
@Inject
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
Configuration hadoopConfig,
ObjectMapper jsonMapper
) throws IOException
public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper)
{
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
@ -101,9 +98,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting) throws IOException
public DataSegment push(final File inDir, final DataSegment segment, final boolean useUniquePath) throws IOException
{
final String storageDir = this.getStorageDir(segment);
// For HDFS, useUniquePath does not affect the directory tree but instead affects the filename, which is of the form
// '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths.
final String storageDir = this.getStorageDir(segment, false);
log.info(
"Copying segment[%s] to HDFS at location[%s/%s]",
@ -128,17 +127,20 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
final DataSegment dataSegment;
try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
size = CompressionUtils.zip(inDir, out);
final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : "";
final Path outIndexFile = new Path(StringUtils.format(
"%s/%s/%d_index.zip",
"%s/%s/%d_%sindex.zip",
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum()
segment.getShardSpec().getPartitionNum(),
uniquePrefix
));
final Path outDescriptorFile = new Path(StringUtils.format(
"%s/%s/%d_descriptor.json",
"%s/%s/%d_%sdescriptor.json",
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum()
segment.getShardSpec().getPartitionNum(),
uniquePrefix
));
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
@ -157,8 +159,8 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
// Create parent if it does not exist, recreation is not an error
fs.mkdirs(outIndexFile.getParent());
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile, replaceExisting);
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile, replaceExisting);
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile);
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
}
finally {
try {
@ -174,19 +176,17 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
return dataSegment;
}
private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to, final boolean replaceExisting)
throws IOException
private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException
{
if (!HadoopFsWrapper.rename(fs, from, to, replaceExisting)) {
if (!HadoopFsWrapper.rename(fs, from, to)) {
if (fs.exists(to)) {
log.info(
"Unable to rename temp Index file[%s] to final segment path [%s]. "
+ "It is already pushed by a replica task.",
"Unable to rename temp file [%s] to segment path [%s], it may have already been pushed by a replica task.",
from,
to
);
} else {
throw new IOE("Failed to rename temp Index file[%s] and final segment path[%s] is not present.", from, to);
throw new IOE("Failed to rename temp file [%s] and final segment path [%s] is not present.", from, to);
}
}
}
@ -221,8 +221,17 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
*/
@Override
public String getStorageDir(DataSegment segment)
public String getStorageDir(DataSegment segment, boolean useUniquePath)
{
// This is only called by HdfsDataSegmentPusher.push(), which will always set useUniquePath to false since any
// 'uniqueness' will be applied not to the directory but to the filename along with the shard number. This is done
// to avoid performance issues due to excessive HDFS directories. Hence useUniquePath is ignored here and we
// expect it to be false.
Preconditions.checkArgument(
!useUniquePath,
"useUniquePath must be false for HdfsDataSegmentPusher.getStorageDir()"
);
return JOINER.join(
segment.getDataSource(),
StringUtils.format(
@ -237,9 +246,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
@Override
public String makeIndexPathName(DataSegment dataSegment, String indexName)
{
// This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false
return StringUtils.format(
"./%s/%d_%s",
this.getStorageDir(dataSegment),
this.getStorageDir(dataSegment, false),
dataSegment.getShardSpec().getPartitionNum(),
indexName
);

View File

@ -44,17 +44,16 @@ public class HadoopFsWrapper
*
* @param from
* @param to
* @param replaceExisting if existing files should be overwritten
*
* @return true if operation succeeded, false if replaceExisting == false and destination already exists
* @return true if operation succeeded, false if destination already exists
*
* @throws IOException if trying to overwrite a non-empty directory
*/
public static boolean rename(FileSystem fs, Path from, Path to, boolean replaceExisting)
public static boolean rename(FileSystem fs, Path from, Path to)
{
try {
// Note: Using reflection instead of simpler
// fs.rename(from, to, replaceExisting ? Options.Rename.OVERWRITE : Options.Rename.NONE);
// fs.rename(from, to, Options.Rename.NONE);
// due to the issues discussed in https://github.com/druid-io/druid/pull/3787
Method renameMethod = findRenameMethodRecursively(fs.getClass());
renameMethod.invoke(fs, from, to, new Options.Rename[]{Options.Rename.NONE});

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.TestHelper;
import io.druid.storage.hdfs.HdfsDataSegmentFinder;
import io.druid.timeline.DataSegment;
@ -278,6 +279,30 @@ public class HdfsDataSegmentFinderTest
hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
}
@Test
public void testPreferNewestSegment() throws Exception
{
dataSourceDir = new Path(new Path(uriBase), "/usr/replicaDataSource");
descriptor1 = new Path(dataSourceDir, StringUtils.format("interval1/v1/%d_%s_%s", 0, "older", DESCRIPTOR_JSON));
descriptor2 = new Path(dataSourceDir, StringUtils.format("interval1/v1/%d_%s_%s", 0, "newer", DESCRIPTOR_JSON));
indexZip1 = new Path(descriptor1.getParent(), StringUtils.format("%d_%s_%s", 0, "older", INDEX_ZIP));
indexZip2 = new Path(descriptor2.getParent(), StringUtils.format("%d_%s_%s", 0, "newer", INDEX_ZIP));
mapper.writeValue(fs.create(descriptor1), SEGMENT_1);
mapper.writeValue(fs.create(descriptor2), SEGMENT_1);
create(indexZip1);
Thread.sleep(1000);
create(indexZip2);
final Set<DataSegment> segments = new HdfsDataSegmentFinder(conf, mapper).findSegments(
dataSourceDir.toString(), false
);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(indexZip2.toUri().getPath(), segments.iterator().next().getLoadSpec().get("path"));
}
private String getDescriptorPath(DataSegment segment)
{
final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path")));

View File

@ -26,13 +26,13 @@ import io.druid.java.util.common.StringUtils;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.UUID;
/**
*/
@ -129,7 +129,10 @@ public class HdfsDataSegmentKillerTest
Path interval1Dir = new Path(dataSourceDir, "intervalNew");
Path version11Dir = new Path(interval1Dir, "v1");
makePartitionDirWithIndexWitNewFormat(fs, version11Dir, 3);
Assert.assertTrue(fs.mkdirs(version11Dir));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_index.zip", 3)));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_descriptor.json", 3)));
killer.kill(getSegmentWithPath(new Path(version11Dir, "3_index.zip").toString()));
Assert.assertFalse(fs.exists(version11Dir));
@ -140,6 +143,43 @@ public class HdfsDataSegmentKillerTest
Assert.assertTrue(fs.delete(dataSourceDir, false));
}
@Test
public void testKillForSegmentWithUniquePath() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
config,
new HdfsDataSegmentPusherConfig()
{
@Override
public String getStorageDirectory()
{
return "/tmp";
}
}
);
FileSystem fs = FileSystem.get(config);
Path dataSourceDir = new Path("/tmp/dataSourceNew");
Path interval1Dir = new Path(dataSourceDir, "intervalNew");
Path version11Dir = new Path(interval1Dir, "v1");
String uuid = UUID.randomUUID().toString().substring(0, 5);
Assert.assertTrue(fs.mkdirs(version11Dir));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_%s_index.zip", 3, uuid)));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_%s_descriptor.json", 3, uuid)));
killer.kill(getSegmentWithPath(new Path(version11Dir, StringUtils.format("%s_%s_index.zip", 3, uuid)).toString()));
Assert.assertFalse(fs.exists(version11Dir));
Assert.assertFalse(fs.exists(interval1Dir));
Assert.assertTrue(fs.exists(dataSourceDir));
Assert.assertTrue(fs.exists(new Path("/tmp")));
Assert.assertTrue(fs.exists(dataSourceDir));
Assert.assertTrue(fs.delete(dataSourceDir, false));
}
@Test
public void testKillNonExistingSegment() throws Exception
{
@ -161,22 +201,8 @@ public class HdfsDataSegmentKillerTest
private void makePartitionDirWithIndex(FileSystem fs, Path path) throws IOException
{
Assert.assertTrue(fs.mkdirs(path));
try (FSDataOutputStream os = fs.create(new Path(path, "index.zip")); FSDataOutputStream oos = fs.create(new Path(
path,
"descriptor.json"
))) {
}
}
private void makePartitionDirWithIndexWitNewFormat(FileSystem fs, Path path, Integer partitionNumber)
throws IOException
{
Assert.assertTrue(fs.mkdirs(path));
try (FSDataOutputStream os = fs.create(new Path(
path,
StringUtils.format("%s_index.zip", partitionNumber)
)); FSDataOutputStream oos = fs.create(new Path(path, StringUtils.format("%s_descriptor.json", partitionNumber)))) {
}
fs.createNewFile(new Path(path, "index.zip"));
fs.createNewFile(new Path(path, "descriptor.json"));
}
private DataSegment getSegmentWithPath(String path)

View File

@ -127,7 +127,8 @@ public class HdfsDataSegmentPusherTest
testUsingSchemeForMultipleSegments("file", 3);
}
private void testUsingScheme(final String scheme) throws Exception
@Test
public void testUsingUniqueFilePath() throws Exception
{
Configuration conf = new Configuration(true);
@ -142,11 +143,7 @@ public class HdfsDataSegmentPusherTest
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
final File storageDirectory = tempFolder.newFolder();
config.setStorageDirectory(
scheme != null
? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath())
: storageDirectory.getAbsolutePath()
);
config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath()));
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
DataSegment segmentToPush = new DataSegment(
@ -163,49 +160,11 @@ public class HdfsDataSegmentPusherTest
DataSegment segment = pusher.push(segmentDir, segmentToPush, true);
String indexUri = StringUtils.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
pusher.getStorageDir(segmentToPush),
segmentToPush.getShardSpec().getPartitionNum()
String matcher = ".*/foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0_[A-Za-z0-9-]{36}_index\\.zip";
Assert.assertTrue(
segment.getLoadSpec().get("path").toString(),
segment.getLoadSpec().get("path").toString().matches(matcher)
);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
indexUri
), segment.getLoadSpec());
// rename directory after push
final String segmentPath = pusher.getStorageDir(segment);
File indexFile = new File(StringUtils.format(
"%s/%s/%d_index.zip",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(StringUtils.format(
"%s/%s/%d_descriptor.json",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(descriptorFile.exists());
// push twice will fail and temp dir cleaned
File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly();
try {
pusher.push(segmentDir, segmentToPush, true);
}
catch (IOException e) {
Assert.fail("should not throw exception");
}
}
private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception
@ -246,12 +205,12 @@ public class HdfsDataSegmentPusherTest
}
for (int i = 0; i < numberOfSegments; i++) {
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], true);
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], false);
String indexUri = StringUtils.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
pusher.getStorageDir(segments[i]),
pusher.getStorageDir(segments[i], false),
segments[i].getShardSpec().getPartitionNum()
);
@ -264,7 +223,7 @@ public class HdfsDataSegmentPusherTest
indexUri
), pushedSegment.getLoadSpec());
// rename directory after push
String segmentPath = pusher.getStorageDir(pushedSegment);
String segmentPath = pusher.getStorageDir(pushedSegment, false);
File indexFile = new File(StringUtils.format(
"%s/%s/%d_index.zip",
@ -293,7 +252,7 @@ public class HdfsDataSegmentPusherTest
indexUri
), fromDescriptorFileDataSegment.getLoadSpec());
// rename directory after push
segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment);
segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment, false);
indexFile = new File(StringUtils.format(
"%s/%s/%d_index.zip",
@ -308,7 +267,7 @@ public class HdfsDataSegmentPusherTest
File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly();
try {
pusher.push(segmentDir, segments[i], true);
pusher.push(segmentDir, segments[i], false);
}
catch (IOException e) {
Assert.fail("should not throw exception");
@ -316,6 +275,87 @@ public class HdfsDataSegmentPusherTest
}
}
private void testUsingScheme(final String scheme) throws Exception
{
Configuration conf = new Configuration(true);
// Create a mock segment on disk
File segmentDir = tempFolder.newFolder();
File tmp = new File(segmentDir, "version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
final File storageDirectory = tempFolder.newFolder();
config.setStorageDirectory(
scheme != null
? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath())
: storageDirectory.getAbsolutePath()
);
HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper());
DataSegment segmentToPush = new DataSegment(
"foo",
Intervals.of("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
0,
size
);
DataSegment segment = pusher.push(segmentDir, segmentToPush, false);
String indexUri = StringUtils.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
pusher.getStorageDir(segmentToPush, false),
segmentToPush.getShardSpec().getPartitionNum()
);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
indexUri
), segment.getLoadSpec());
// rename directory after push
final String segmentPath = pusher.getStorageDir(segment, false);
File indexFile = new File(StringUtils.format(
"%s/%s/%d_index.zip",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(StringUtils.format(
"%s/%s/%d_descriptor.json",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(descriptorFile.exists());
// push twice will fail and temp dir cleaned
File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly();
try {
pusher.push(segmentDir, segmentToPush, false);
}
catch (IOException e) {
Assert.fail("should not throw exception");
}
}
public static class TestObjectMapper extends ObjectMapper
{
public TestObjectMapper()
@ -371,7 +411,7 @@ public class HdfsDataSegmentPusherTest
1
);
String storageDir = hdfsDataSegmentPusher.getStorageDir(segment);
String storageDir = hdfsDataSegmentPusher.getStorageDir(segment, false);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir);
}

View File

@ -1937,6 +1937,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller(),
toolbox.getObjectMapper(),
metrics
);

View File

@ -2266,9 +2266,9 @@ public class KafkaIndexTaskTest
private List<String> readSegmentColumn(final String column, final SegmentDescriptor descriptor) throws IOException
{
File indexZip = new File(
File indexBasePath = new File(
StringUtils.format(
"%s/%s/%s_%s/%s/%d/index.zip",
"%s/%s/%s_%s/%s/%d",
getSegmentDirectory(),
DATA_SCHEMA.getDataSource(),
descriptor.getInterval().getStart(),
@ -2277,6 +2277,7 @@ public class KafkaIndexTaskTest
descriptor.getPartitionNumber()
)
);
File outputLocation = new File(
directory,
StringUtils.format(
@ -2289,7 +2290,7 @@ public class KafkaIndexTaskTest
);
outputLocation.mkdir();
CompressionUtils.unzip(
Files.asByteSource(indexZip),
Files.asByteSource(new File(indexBasePath.listFiles()[0], "index.zip")),
outputLocation,
Predicates.<Throwable>alwaysFalse(),
false

View File

@ -27,8 +27,8 @@ import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
@ -37,9 +37,11 @@ import io.druid.timeline.DataSegment;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class S3DataSegmentFinder implements DataSegmentFinder
{
@ -64,7 +66,7 @@ public class S3DataSegmentFinder implements DataSegmentFinder
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Set<DataSegment> segments = Sets.newHashSet();
final Map<String, Pair<DataSegment, Long>> timestampedSegments = new HashMap<>();
try {
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
@ -107,7 +109,12 @@ public class S3DataSegmentFinder implements DataSegmentFinder
s3Client.putObject(config.getBucket(), descriptorJson, bais, objectMetadata);
}
}
segments.add(dataSegment);
DataSegmentFinder.putInMapRetainingNewest(
timestampedSegments,
dataSegment,
objectMetadata.getLastModified() == null ? 0 : objectMetadata.getLastModified().getTime()
);
}
} else {
throw new SegmentLoadingException(
@ -128,6 +135,6 @@ public class S3DataSegmentFinder implements DataSegmentFinder
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
Throwables.propagate(e);
}
return segments;
return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet());
}
}

View File

@ -74,8 +74,11 @@ public class S3DataSegmentMover implements DataSegmentMover
final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");
final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, DataSegmentPusher.getDefaultStorageDir(segment));
String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path);
final String targetS3Path = S3Utils.constructSegmentPath(
targetS3BaseKey,
DataSegmentPusher.getDefaultStorageDir(segment, false)
);
final String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path);
if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");

View File

@ -50,11 +50,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
private final ObjectMapper jsonMapper;
@Inject
public S3DataSegmentPusher(
AmazonS3 s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
public S3DataSegmentPusher(AmazonS3 s3Client, S3DataSegmentPusherConfig config, ObjectMapper jsonMapper)
{
this.s3Client = s3Client;
this.config = config;
@ -86,10 +82,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting)
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath)
throws IOException
{
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment));
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment, useUniquePath));
log.info("Copying segment[%s] to S3 at location[%s]", inSegment.getIdentifier(), s3Path);
@ -108,13 +104,12 @@ public class S3DataSegmentPusher implements DataSegmentPusher
try {
return S3Utils.retryS3Operation(
() -> {
uploadFileIfPossible(s3Client, config.getBucket(), s3Path, zipOutFile, replaceExisting);
uploadFileIfPossible(s3Client, config.getBucket(), s3Path, zipOutFile);
uploadFileIfPossible(
s3Client,
config.getBucket(),
S3Utils.descriptorPathForSegmentPath(s3Path),
descriptorFile,
replaceExisting
descriptorFile
);
return outSegment;
@ -160,26 +155,14 @@ public class S3DataSegmentPusher implements DataSegmentPusher
);
}
private void uploadFileIfPossible(
AmazonS3 s3Client,
String bucket,
String key,
File file,
boolean replaceExisting
)
private void uploadFileIfPossible(AmazonS3 s3Client, String bucket, String key, File file)
{
if (!replaceExisting && S3Utils.isObjectInBucketIgnoringPermission(s3Client, bucket, key)) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", key);
} else {
final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file);
final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file);
if (!config.getDisableAcl()) {
indexFilePutRequest.setAccessControlList(
S3Utils.grantFullControlToBucketOwner(s3Client, bucket)
);
}
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
s3Client.putObject(indexFilePutRequest);
if (!config.getDisableAcl()) {
indexFilePutRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, bucket));
}
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
s3Client.putObject(indexFilePutRequest);
}
}

View File

@ -35,10 +35,9 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.TestHelper;
@ -61,6 +60,7 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -216,31 +216,51 @@ public class S3DataSegmentFinderTest
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
Assert.assertNotEquals(serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent()));
Assert.assertNotEquals(serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent()));
Assert.assertNotEquals(serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent()));
Assert.assertNotEquals(serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent()));
Assert.assertNotEquals(serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent()));
Assert.assertNotEquals(
serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent())
);
final Set<DataSegment> segments2 = s3DataSegmentFinder.findSegments("", true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent()));
Assert.assertEquals(serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent()));
Assert.assertEquals(serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent()));
Assert.assertEquals(serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent()));
Assert.assertEquals(serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent()));
Assert.assertEquals(
serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent())
);
Assert.assertEquals(
serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent())
);
Assert.assertEquals(
serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent())
);
Assert.assertEquals(
serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent())
);
Assert.assertEquals(
serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent())
);
}
@Test(expected = SegmentLoadingException.class)
@ -274,9 +294,7 @@ public class S3DataSegmentFinderTest
public void testFindSegmentsUpdateLoadSpec() throws Exception
{
config.setBucket("amazing");
final DataSegment segmentMissingLoadSpec = DataSegment.builder(SEGMENT_1)
.loadSpec(ImmutableMap.of())
.build();
final DataSegment segmentMissingLoadSpec = DataSegment.builder(SEGMENT_1).loadSpec(ImmutableMap.of()).build();
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final String segmentPath = baseKey + "/interval_missing_load_spec/v1/1/";
final String descriptorPath = S3Utils.descriptorPathForSegmentPath(segmentPath);
@ -297,6 +315,36 @@ public class S3DataSegmentFinderTest
Assert.assertEquals(indexPath, testLoadSpec.get("key"));
}
@Test
public void testPreferNewestSegment() throws Exception
{
baseKey = "replicaDataSource";
config = new S3DataSegmentPusherConfig();
config.setBucket(bucket);
config.setBaseKey(baseKey);
descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval10/v1/0/older/");
descriptor2 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval10/v1/0/newer/");
indexZip1 = S3Utils.indexZipForSegmentPath(descriptor1);
indexZip2 = S3Utils.indexZipForSegmentPath(descriptor2);
mockS3Client.putObject(bucket, descriptor1, mapper.writeValueAsString(SEGMENT_1));
mockS3Client.putObject(bucket, indexZip1, "dummy");
Thread.sleep(1000);
mockS3Client.putObject(bucket, descriptor2, mapper.writeValueAsString(SEGMENT_1));
mockS3Client.putObject(bucket, indexZip2, "dummy");
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments("", false);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(indexZip2, segments.iterator().next().getLoadSpec().get("key"));
}
private String getDescriptorPath(DataSegment segment)
{
return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key")));
@ -305,7 +353,7 @@ public class S3DataSegmentFinderTest
private static class MockAmazonS3Client extends AmazonS3Client
{
private final File baseDir;
private final Map<String, Set<String>> storage = Maps.newHashMap();
private final Map<String, Map<String, ObjectMetadata>> storage = Maps.newHashMap();
public MockAmazonS3Client(File baseDir)
{
@ -316,9 +364,9 @@ public class S3DataSegmentFinderTest
@Override
public boolean doesObjectExist(String bucketName, String objectName)
{
final Set<String> keys = storage.get(bucketName);
final Map<String, ObjectMetadata> keys = storage.get(bucketName);
if (keys != null) {
return keys.contains(objectName);
return keys.keySet().contains(objectName);
}
return false;
}
@ -329,7 +377,9 @@ public class S3DataSegmentFinderTest
final String bucketName = listObjectsV2Request.getBucketName();
final String prefix = listObjectsV2Request.getPrefix();
final List<String> keysOrigin = Lists.newArrayList(storage.get(bucketName));
final List<String> keysOrigin = storage.get(bucketName) == null
? ImmutableList.of()
: new ArrayList<>(storage.get(bucketName).keySet());
Predicate<String> prefixFilter = new Predicate<String>()
{
@ -387,7 +437,7 @@ public class S3DataSegmentFinderTest
throw ex;
}
if (!storage.get(bucketName).contains(objectKey)) {
if (!storage.get(bucketName).keySet().contains(objectKey)) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest");
ex.setStatusCode(404);
ex.setErrorCode("NoSuchKey");
@ -398,6 +448,7 @@ public class S3DataSegmentFinderTest
S3Object storageObject = new S3Object();
storageObject.setBucketName(bucketName);
storageObject.setKey(objectKey);
storageObject.setObjectMetadata(storage.get(bucketName).get(objectKey));
try {
storageObject.setObjectContent(new FileInputStream(objectPath));
}
@ -414,16 +465,19 @@ public class S3DataSegmentFinderTest
@Override
public PutObjectResult putObject(String bucketName, String key, String data)
{
return putObject(bucketName, key, new ByteArrayInputStream(StringUtils.toUtf8(data)), null);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setLastModified(DateTimes.nowUtc().toDate());
return putObject(bucketName, key, new ByteArrayInputStream(StringUtils.toUtf8(data)), metadata);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata)
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, Sets.newHashSet());
storage.put(bucketName, new HashMap<>());
}
storage.get(bucketName).add(key);
storage.get(bucketName).put(key, metadata);
final File objectPath = new File(baseDir, key);

View File

@ -71,6 +71,17 @@ public class S3DataSegmentPusherTest
@Test
public void testPush() throws Exception
{
testPushInternal(false, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip");
}
@Test
public void testPushUseUniquePath() throws Exception
{
testPushInternal(true, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip");
}
private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
{
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
@ -131,14 +142,15 @@ public class S3DataSegmentPusherTest
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(1, (int) segment.getBinaryVersion());
Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket"));
Assert.assertEquals(
"key/foo/2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z/0/0/index.zip",
segment.getLoadSpec().get("key"));
Assert.assertTrue(
segment.getLoadSpec().get("key").toString(),
segment.getLoadSpec().get("key").toString().matches(matcher)
);
Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type"));
// Verify that the pushed S3Object contains the correct data

View File

@ -366,7 +366,7 @@ public class JobHelper
return succeeded;
}
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config)
{
boolean succeeded = true;

View File

@ -199,10 +199,7 @@ public class YeOldePlumberSchool implements PlumberSchool
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload));
// This plumber is only used in batch ingestion situations where you do not have replica tasks pushing
// segments with the same identifier but potentially different contents. In case of conflict, favor the most
// recently pushed segment (replaceExisting == true).
dataSegmentPusher.push(fileToUpload, segmentToUpload, true);
dataSegmentPusher.push(fileToUpload, segmentToUpload, false);
log.info(
"Uploaded segment[%s]",

View File

@ -709,6 +709,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller(),
toolbox.getObjectMapper(),
metrics
);

View File

@ -441,10 +441,7 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
// version, but is "newer" than said original version.
DataSegment updatedSegment = segment.withVersion(StringUtils.format("%s_v%s", segment.getVersion(), outVersion));
// The convert segment task does not support replicas where different tasks could generate segments with the
// same identifier but potentially different contents. In case of conflict, favor the most recently pushed
// segment (replaceExisting == true).
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, true);
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, false);
actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
} else {

View File

@ -1105,7 +1105,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
return new BatchAppenderatorDriver(
appenderator,
segmentAllocator,
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient())
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller()
);
}

View File

@ -186,10 +186,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
// Upload file
// The merge task does not support replicas where different tasks could generate segments with the
// same identifier but potentially different contents. In case of conflict, favor the most recently pushed
// segment (replaceExisting == true).
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, true);
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, false);
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));

View File

@ -72,6 +72,7 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
@ -152,7 +153,7 @@ public class IndexTaskTest
}
@After
public void teardown() throws IOException
public void teardown()
{
reportsFile.delete();
}
@ -1424,7 +1425,7 @@ public class IndexTaskTest
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
segments.add(segment);
return segment;
@ -1437,12 +1438,27 @@ public class IndexTaskTest
}
};
final DataSegmentKiller killer = new DataSegmentKiller()
{
@Override
public void kill(DataSegment segment)
{
}
@Override
public void killAll()
{
}
};
final TaskToolbox box = new TaskToolbox(
null,
actionClient,
null,
pusher,
null,
killer,
null,
null,
null,

View File

@ -199,7 +199,7 @@ public class SameIntervalMergeTaskTest
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
// the merged segment is pushed to storage
segments.add(segment);

View File

@ -249,7 +249,7 @@ public class IngestSegmentFirehoseFactoryTest
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
return segment;
}

View File

@ -482,7 +482,7 @@ public class TaskLifecycleTest
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
pushedSegments++;
return segment;
@ -1071,7 +1071,7 @@ public class TaskLifecycleTest
}
@Override
public DataSegment push(File file, DataSegment dataSegment, boolean replaceExisting)
public DataSegment push(File file, DataSegment dataSegment, boolean useUniquePath)
{
throw new RuntimeException("FAILURE");
}

View File

@ -46,7 +46,7 @@ public class TestDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
pushedSegments.add(segment);
return segment;

View File

@ -20,24 +20,24 @@
package io.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
public class LocalDataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(LocalDataSegmentFinder.class);
private final ObjectMapper mapper;
@ -49,25 +49,26 @@ public class LocalDataSegmentFinder implements DataSegmentFinder
}
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor)
throws SegmentLoadingException
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Map<String, Pair<DataSegment, Long>> timestampedSegments = new HashMap<>();
final Set<DataSegment> segments = Sets.newHashSet();
final File workingDir = new File(workingDirPath);
if (!workingDir.isDirectory()) {
throw new SegmentLoadingException("Working directory [%s] didn't exist !?", workingDir);
}
recursiveSearchSegments(segments, workingDir, updateDescriptor);
return segments;
recursiveSearchSegments(timestampedSegments, workingDir, updateDescriptor);
return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet());
}
private void recursiveSearchSegments(Set<DataSegment> segments, File workingDir, boolean updateDescriptor)
throws SegmentLoadingException
private void recursiveSearchSegments(
Map<String, Pair<DataSegment, Long>> timestampedSegments, File workingDir, boolean updateDescriptor
) throws SegmentLoadingException
{
for (File file : workingDir.listFiles()) {
if (file.isDirectory()) {
recursiveSearchSegments(segments, file, updateDescriptor);
recursiveSearchSegments(timestampedSegments, file, updateDescriptor);
} else if (file.getName().equals("descriptor.json")) {
final File indexZip = new File(file.getParentFile(), "index.zip");
if (indexZip.exists()) {
@ -88,7 +89,8 @@ public class LocalDataSegmentFinder implements DataSegmentFinder
FileUtils.writeStringToFile(file, mapper.writeValueAsString(dataSegment));
}
}
segments.add(dataSegment);
DataSegmentFinder.putInMapRetainingNewest(timestampedSegments, dataSegment, indexZip.lastModified());
}
catch (IOException e) {
throw new SegmentLoadingException(

View File

@ -52,19 +52,21 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
try {
if (path.getName().endsWith(".zip")) {
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
File partitionNumDir = path.getParentFile();
FileUtils.deleteDirectory(partitionNumDir);
// or .../dataSource/interval/version/partitionNum/UUID/xxx.zip
//try to delete other directories if possible
File versionDir = partitionNumDir.getParentFile();
if (versionDir.delete()) {
File intervalDir = versionDir.getParentFile();
if (intervalDir.delete()) {
File dataSourceDir = intervalDir.getParentFile();
dataSourceDir.delete();
File parentDir = path.getParentFile();
FileUtils.deleteDirectory(parentDir);
// possibly recursively delete empty parent directories up to 'dataSource'
parentDir = parentDir.getParentFile();
int maxDepth = 4; // if for some reason there's no datasSource directory, stop recursing somewhere reasonable
while (parentDir != null && --maxDepth >= 0) {
if (!parentDir.delete() || segment.getDataSource().equals(parentDir.getName())) {
break;
}
parentDir = parentDir.getParentFile();
}
} else {
throw new SegmentLoadingException("Unknown file type[%s]", path);

View File

@ -32,7 +32,6 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Map;
@ -71,10 +70,11 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment, boolean replaceExisting) throws IOException
public DataSegment push(final File dataSegmentFile, final DataSegment segment, final boolean useUniquePath)
throws IOException
{
final File baseStorageDir = config.getStorageDirectory();
final File outDir = new File(baseStorageDir, this.getStorageDir(segment));
final File outDir = new File(baseStorageDir, this.getStorageDir(segment, useUniquePath));
log.info("Copying segment[%s] to local filesystem at location[%s]", segment.getIdentifier(), outDir.toString());
@ -109,31 +109,15 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
);
FileUtils.forceMkdir(outDir);
if (replaceExisting) {
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName());
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName());
if (!tmpIndexFile.renameTo(indexFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget);
}
if (!tmpIndexFile.renameTo(indexFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget);
}
if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget);
}
} else {
try {
Files.move(tmpIndexFile.toPath(), outDir.toPath().resolve(tmpIndexFile.toPath().getFileName()));
}
catch (FileAlreadyExistsException e) {
log.info("[%s] already exists at [%s], ignore if replication is configured", INDEX_FILENAME, outDir);
}
try {
Files.move(tmpDescriptorFile.toPath(), outDir.toPath().resolve(tmpDescriptorFile.toPath().getFileName()));
}
catch (FileAlreadyExistsException e) {
log.info("[%s] already exists at [%s], ignore if replication is configured", DESCRIPTOR_FILENAME, outDir);
dataSegment = jsonMapper.readValue(new File(outDir, DESCRIPTOR_FILENAME), DataSegment.class);
}
if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget);
}
return dataSegment;

View File

@ -91,10 +91,10 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
return findStorageLocationIfLoaded(segment) != null;
}
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : getSortedList(locations)) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment));
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
return location;
}
@ -127,7 +127,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
@ -226,18 +226,17 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : getSortedList(locations)) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment));
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
File cacheFile = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment));
cleanupCacheFiles(location.getPath(), cacheFile);
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegment(segment);
}
}
}
public void cleanupCacheFiles(File baseFile, File cacheFile)
private void cleanupCacheFiles(File baseFile, File cacheFile)
{
if (cacheFile.equals(baseFile)) {
return;
@ -262,7 +261,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
}
}
public List<StorageLocation> getSortedList(List<StorageLocation> locs)
private List<StorageLocation> getSortedList(List<StorageLocation> locs)
{
List<StorageLocation> locations = new ArrayList<>(locs);
Collections.sort(locations, COMPARATOR);

View File

@ -195,11 +195,16 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
*
* @param identifiers list of segments to push
* @param committer a committer associated with all data that has been added so far
* @param useUniquePath true if the segment should be written to a path with a unique identifier
*
* @return future that resolves when all segments have been pushed. The segment list will be the list of segments
* that have been pushed and the commit metadata from the Committer.
*/
ListenableFuture<SegmentsAndMetadata> push(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer);
ListenableFuture<SegmentsAndMetadata> push(
Collection<SegmentIdentifier> identifiers,
@Nullable Committer committer,
boolean useUniquePath
);
/**
* Stop any currently-running processing and clean up after ourselves. This allows currently running persists and pushes

View File

@ -509,7 +509,8 @@ public class AppenderatorImpl implements Appenderator
@Override
public ListenableFuture<SegmentsAndMetadata> push(
final Collection<SegmentIdentifier> identifiers,
@Nullable final Committer committer
@Nullable final Committer committer,
final boolean useUniquePath
)
{
final Map<SegmentIdentifier, Sink> theSinks = Maps.newHashMap();
@ -533,7 +534,7 @@ public class AppenderatorImpl implements Appenderator
continue;
}
final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue());
final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue(), useUniquePath);
if (dataSegment != null) {
dataSegments.add(dataSegment);
} else {
@ -563,17 +564,13 @@ public class AppenderatorImpl implements Appenderator
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only
* be run in the single-threaded pushExecutor.
*
* Note that this calls DataSegmentPusher.push() with replaceExisting == true which is appropriate for the indexing
* tasks it is currently being used for (local indexing and Kafka indexing). If this is going to be used by an
* indexing task type that requires replaceExisting == false, this setting will need to be pushed to the caller.
*
* @param identifier sink identifier
* @param sink sink to push
* @param useUniquePath true if the segment should be written to a path with a unique identifier
*
* @return segment descriptor, or null if the sink is no longer valid
*/
private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink sink)
private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink sink, final boolean useUniquePath)
{
// Bail out if this sink is null or otherwise not what we expect.
if (sinks.get(identifier) != sink) {
@ -645,17 +642,12 @@ public class AppenderatorImpl implements Appenderator
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
// The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the
// Kafka indexing task, pushers MUST overwrite any existing objects in deep storage with the same identifier
// in order to maintain exactly-once semantics. If they do not and instead favor existing objects, in case of
// failure during publishing, the indexed data may not represent the checkpointed state and data loss or
// duplication may occur. See: https://github.com/druid-io/druid/issues/5161. The local indexing task does not
// support replicas where different tasks could generate segments with the same identifier but potentially
// different contents so it is okay if existing objects are overwritten. In both of these cases, we want to
// favor the most recently pushed segment so replaceExisting == true.
// Kafka indexing task, pushers must use unique file paths in deep storage in order to maintain exactly-once
// semantics.
() -> dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
true
useUniquePath
),
exception -> exception instanceof Exception,
5

View File

@ -463,7 +463,7 @@ public class AppenderatorPlumber implements Plumber
// WARNING: Committers.nil() here means that on-disk data can get out of sync with committing.
Futures.addCallback(
appenderator.push(segmentsToPush, Committers.nil()),
appenderator.push(segmentsToPush, Committers.nil(), false),
new FutureCallback<SegmentsAndMetadata>()
{
@Override

View File

@ -39,6 +39,7 @@ import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.joda.time.DateTime;
@ -128,6 +129,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
private final SegmentAllocator segmentAllocator;
private final UsedSegmentChecker usedSegmentChecker;
private final DataSegmentKiller dataSegmentKiller;
protected final Appenderator appenderator;
// sequenceName -> segmentsForSequence
@ -141,12 +143,14 @@ public abstract class BaseAppenderatorDriver implements Closeable
BaseAppenderatorDriver(
Appenderator appenderator,
SegmentAllocator segmentAllocator,
UsedSegmentChecker usedSegmentChecker
UsedSegmentChecker usedSegmentChecker,
DataSegmentKiller dataSegmentKiller
)
{
this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller");
this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d"));
}
@ -332,24 +336,32 @@ public abstract class BaseAppenderatorDriver implements Closeable
*
* @param wrappedCommitter should not be null if you want to persist intermediate states
* @param segmentIdentifiers identifiers of the segments to be pushed
* @param useUniquePath true if the segment should be written to a path with a unique identifier
*
* @return a future for pushing segments
*/
ListenableFuture<SegmentsAndMetadata> pushInBackground(
@Nullable final WrappedCommitter wrappedCommitter,
final Collection<SegmentIdentifier> segmentIdentifiers
final Collection<SegmentIdentifier> segmentIdentifiers,
final boolean useUniquePath
)
{
log.info("Pushing segments in background: [%s]", Joiner.on(", ").join(segmentIdentifiers));
return Futures.transform(
appenderator.push(segmentIdentifiers, wrappedCommitter),
appenderator.push(segmentIdentifiers, wrappedCommitter, useUniquePath),
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> {
// Sanity check
final Set<SegmentIdentifier> pushedSegments = segmentsAndMetadata.getSegments().stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toSet());
if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) {
log.warn(
"Removing segments from deep storage because sanity check failed: %s", segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
throw new ISE(
"WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].",
pushedSegments,
@ -437,13 +449,25 @@ public abstract class BaseAppenderatorDriver implements Closeable
.collect(Collectors.toSet());
if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info(
"Removing our segments from deep storage because someone else already published them: %s",
segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
log.info("Our segments really do exist, awaiting handoff.");
} else {
throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
}
}
}
catch (IOException e) {
catch (Exception e) {
log.warn(
"Removing segments from deep storage after failed publish: %s",
segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
throw Throwables.propagate(e);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.timeline.DataSegment;
@ -64,10 +65,11 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
public BatchAppenderatorDriver(
Appenderator appenderator,
SegmentAllocator segmentAllocator,
UsedSegmentChecker usedSegmentChecker
UsedSegmentChecker usedSegmentChecker,
DataSegmentKiller dataSegmentKiller
)
{
super(appenderator, segmentAllocator, usedSegmentChecker);
super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller);
}
/**
@ -133,7 +135,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
.collect(Collectors.toList());
final ListenableFuture<SegmentsAndMetadata> future = ListenableFutures.transformAsync(
pushInBackground(null, segmentIdentifierList),
pushInBackground(null, segmentIdentifierList, false),
this::dropInBackground
);

View File

@ -35,6 +35,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
@ -89,11 +90,12 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
SegmentAllocator segmentAllocator,
SegmentHandoffNotifierFactory handoffNotifierFactory,
UsedSegmentChecker usedSegmentChecker,
DataSegmentKiller dataSegmentKiller,
ObjectMapper objectMapper,
FireDepartmentMetrics metrics
)
{
super(appenderator, segmentAllocator, usedSegmentChecker);
super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller);
this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory")
.createSegmentHandoffNotifier(appenderator.getDataSource());
@ -270,7 +272,9 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
.collect(Collectors.toList());
final ListenableFuture<SegmentsAndMetadata> publishFuture = ListenableFutures.transformAsync(
pushInBackground(wrapCommitter(committer), theSegments),
// useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second
// version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs
pushInBackground(wrapCommitter(committer), theSegments, true),
sam -> publishInBackground(
sam,
publisher

View File

@ -448,13 +448,6 @@ public class RealtimePlumber implements Plumber
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
// The realtime plumber can generate segments with the same identifier (i.e. replica tasks) but does not
// have any strict requirement that the contents of these segments be identical. It is possible that two
// tasks generate a segment with the same identifier containing different data, and in this situation we
// want to favor the data from the task which pushed first. This is because it is possible that one
// historical could load the segment after the first task pushed and another historical load the same
// segment after the second task pushed. If the second task's segment overwrote the first one, the second
// historical node would be serving different data from the first. Hence set replaceExisting == false.
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),

View File

@ -36,6 +36,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Set;
@ -204,12 +205,6 @@ public class LocalDataSegmentFinderTest
Assert.assertEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
}
private String getDescriptorPath(DataSegment segment)
{
final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent() + "/" + DESCRIPTOR_JSON;
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws SegmentLoadingException
{
@ -219,4 +214,44 @@ public class LocalDataSegmentFinderTest
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
}
@Test
public void testPreferNewestSegment() throws Exception
{
dataSourceDir = temporaryFolder.newFolder();
descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval10/v10/0/older", DESCRIPTOR_JSON);
descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval10/v10/0/newer", DESCRIPTOR_JSON);
descriptor1.getParentFile().mkdirs();
descriptor2.getParentFile().mkdirs();
mapper.writeValue(descriptor1, SEGMENT_1);
mapper.writeValue(descriptor2, SEGMENT_1);
indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP);
indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP);
FileOutputStream fos1 = new FileOutputStream(indexZip1);
fos1.getFD().sync();
fos1.close();
Thread.sleep(1000);
FileOutputStream fos2 = new FileOutputStream(indexZip2);
fos2.getFD().sync();
fos2.close();
final Set<DataSegment> segments = new LocalDataSegmentFinder(mapper).findSegments(
dataSourceDir.getAbsolutePath(), false
);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(indexZip2.getAbsolutePath(), segments.iterator().next().getLoadSpec().get("path"));
}
private String getDescriptorPath(DataSegment segment)
{
final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent() + "/" + DESCRIPTOR_JSON;
}
}

View File

@ -31,6 +31,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
public class LocalDataSegmentKillerTest
{
@ -93,6 +94,28 @@ public class LocalDataSegmentKillerTest
Assert.assertFalse(dataSourceDir.exists());
}
@Test
public void testKillUniquePath() throws Exception
{
final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig());
final String uuid = UUID.randomUUID().toString().substring(0, 5);
final File dataSourceDir = temporaryFolder.newFolder("dataSource");
final File intervalDir = new File(dataSourceDir, "interval");
final File versionDir = new File(intervalDir, "1");
final File partitionDir = new File(versionDir, "0");
final File uuidDir = new File(partitionDir, uuid);
makePartitionDirWithIndex(uuidDir);
killer.kill(getSegmentWithPath(new File(uuidDir, "index.zip").toString()));
Assert.assertFalse(uuidDir.exists());
Assert.assertFalse(partitionDir.exists());
Assert.assertFalse(versionDir.exists());
Assert.assertFalse(intervalDir.exists());
Assert.assertFalse(dataSourceDir.exists());
}
private void makePartitionDirWithIndex(File path) throws IOException
{
Assert.assertTrue(path.mkdirs());

View File

@ -92,8 +92,8 @@ public class LocalDataSegmentPusherTest
*/
final DataSegment dataSegment2 = dataSegment.withVersion("v2");
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2, true);
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2, false);
Assert.assertNotNull(returnSegment1);
Assert.assertEquals(dataSegment, returnSegment1);
@ -102,14 +102,14 @@ public class LocalDataSegmentPusherTest
Assert.assertEquals(dataSegment2, returnSegment2);
Assert.assertNotEquals(
localDataSegmentPusher.getStorageDir(dataSegment),
localDataSegmentPusher.getStorageDir(dataSegment2)
localDataSegmentPusher.getStorageDir(dataSegment, false),
localDataSegmentPusher.getStorageDir(dataSegment2, false)
);
for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) {
File outDir = new File(
config.getStorageDirectory(),
localDataSegmentPusher.getStorageDir(returnSegment)
localDataSegmentPusher.getStorageDir(returnSegment, false)
);
File versionFile = new File(outDir, "index.zip");
File descriptorJson = new File(outDir, "descriptor.json");
@ -119,34 +119,24 @@ public class LocalDataSegmentPusherTest
}
@Test
public void testFirstPushWinsForConcurrentPushesWhenReplaceExistingFalse() throws IOException
public void testPushUseUniquePath() throws IOException
{
DataSegment segment = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
String path = segment.getLoadSpec().get("path").toString();
String matcher = ".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index\\.zip";
Assert.assertTrue(path, path.matches(matcher));
Assert.assertTrue(new File(path).exists());
}
@Test
public void testLastPushWinsForConcurrentPushes() throws IOException
{
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, false);
Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions());
Assert.assertEquals(dataSegment.getDimensions(), returnSegment2.getDimensions());
File unzipDir = new File(config.storageDirectory, "unzip");
FileUtils.forceMkdir(unzipDir);
CompressionUtils.unzip(
new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
unzipDir
);
Assert.assertEquals(0x9, Ints.fromByteArray(Files.toByteArray(new File(unzipDir, "version.bin"))));
}
@Test
public void testLastPushWinsForConcurrentPushesWhenReplaceExistingTrue() throws IOException
{
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, true);
Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions());
Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions());
@ -168,7 +158,7 @@ public class LocalDataSegmentPusherTest
config.storageDirectory = new File(config.storageDirectory, "xxx");
Assert.assertTrue(config.storageDirectory.mkdir());
config.storageDirectory.setWritable(false);
localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
}
@Test

View File

@ -117,7 +117,8 @@ public class AppenderatorTest
// push all
final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(
appenderator.getSegments(),
committerSupplier.get()
committerSupplier.get(),
false
).get();
Assert.assertEquals(ImmutableMap.of("x", "3"), (Map<String, String>) segmentsAndMetadata.getCommitMetadata());
Assert.assertEquals(

View File

@ -193,7 +193,7 @@ public class AppenderatorTester implements AutoCloseable
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;

View File

@ -27,10 +27,13 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -41,7 +44,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class BatchAppenderatorDriverTest
public class BatchAppenderatorDriverTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final String VERSION = "abc123";
@ -69,22 +72,29 @@ public class BatchAppenderatorDriverTest
private SegmentAllocator allocator;
private AppenderatorTester appenderatorTester;
private BatchAppenderatorDriver driver;
private DataSegmentKiller dataSegmentKiller;
@Before
public void setup()
{
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
driver = new BatchAppenderatorDriver(
appenderatorTester.getAppenderator(),
allocator,
new TestUsedSegmentChecker(appenderatorTester)
new TestUsedSegmentChecker(appenderatorTester),
dataSegmentKiller
);
EasyMock.replay(dataSegmentKiller);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(dataSegmentKiller);
driver.clear();
driver.close();
}

View File

@ -34,15 +34,20 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.After;
@ -64,7 +69,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class StreamAppenderatorDriverFailTest
public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@ -91,6 +96,7 @@ public class StreamAppenderatorDriverFailTest
SegmentAllocator allocator;
TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
StreamAppenderatorDriver driver;
DataSegmentKiller dataSegmentKiller;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -100,6 +106,7 @@ public class StreamAppenderatorDriverFailTest
{
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
}
@After
@ -125,6 +132,7 @@ public class StreamAppenderatorDriverFailTest
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
@ -162,6 +170,7 @@ public class StreamAppenderatorDriverFailTest
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
@ -199,6 +208,7 @@ public class StreamAppenderatorDriverFailTest
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
@ -224,6 +234,94 @@ public class StreamAppenderatorDriverFailTest
driver.registerHandoff(published).get();
}
@Test
public void testFailDuringPublish() throws Exception
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage(
"Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]");
testFailDuringPublishInternal(false);
}
@Test
public void testFailWithExceptionDuringPublish() throws Exception
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(RuntimeException.class));
expectedException.expectMessage("test");
testFailDuringPublishInternal(true);
}
private void testFailDuringPublishInternal(boolean failWithException) throws Exception
{
driver = new StreamAppenderatorDriver(
new FailableAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
driver.startJob();
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
segmentHandoffNotifierFactory.setHandoffDelay(100);
Assert.assertNull(driver.startJob());
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}
dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
EasyMock.replay(dataSegmentKiller);
try {
driver.publish(
StreamAppenderatorDriverTest.makeFailingPublisher(failWithException),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
throw e;
}
finally {
EasyMock.verify(dataSegmentKiller);
}
}
private static class NoopUsedSegmentChecker implements UsedSegmentChecker
{
@Override
@ -304,7 +402,10 @@ public class StreamAppenderatorDriverFailTest
@Override
public AppenderatorAddResult add(
SegmentIdentifier identifier, InputRow row, Supplier<Committer> committerSupplier, boolean allowIncrementalPersists
SegmentIdentifier identifier,
InputRow row,
Supplier<Committer> committerSupplier,
boolean allowIncrementalPersists
)
{
rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row);
@ -367,7 +468,7 @@ public class StreamAppenderatorDriverFailTest
@Override
public ListenableFuture<SegmentsAndMetadata> push(
Collection<SegmentIdentifier> identifiers, Committer committer
Collection<SegmentIdentifier> identifiers, Committer committer, boolean useUniquePath
)
{
if (pushEnabled) {

View File

@ -20,7 +20,6 @@
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -38,11 +37,14 @@ import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
@ -61,7 +63,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class StreamAppenderatorDriverTest
public class StreamAppenderatorDriverTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final String VERSION = "abc123";
@ -93,26 +95,33 @@ public class StreamAppenderatorDriverTest
private AppenderatorTester appenderatorTester;
private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
private StreamAppenderatorDriver driver;
private DataSegmentKiller dataSegmentKiller;
@Before
public void setUp()
public void setUp() throws Exception
{
appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
driver = new StreamAppenderatorDriver(
appenderatorTester.getAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new TestUsedSegmentChecker(appenderatorTester),
dataSegmentKiller,
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
EasyMock.replay(dataSegmentKiller);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(dataSegmentKiller);
driver.clear();
driver.close();
}
@ -345,30 +354,21 @@ public class StreamAppenderatorDriverTest
private Set<SegmentIdentifier> asIdentifiers(Iterable<DataSegment> segments)
{
return ImmutableSet.copyOf(
Iterables.transform(
segments,
new Function<DataSegment, SegmentIdentifier>()
{
@Override
public SegmentIdentifier apply(DataSegment input)
{
return SegmentIdentifier.fromDataSegment(input);
}
}
)
);
return ImmutableSet.copyOf(Iterables.transform(segments, SegmentIdentifier::fromDataSegment));
}
static TransactionalSegmentPublisher makeOkPublisher()
{
return new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata)
{
return true;
return (segments, commitMetadata) -> true;
}
static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
{
return (segments, commitMetadata) -> {
if (failWithException) {
throw new RuntimeException("test");
}
return false;
};
}

View File

@ -154,7 +154,7 @@ public class CliRealtimeExample extends ServerRunnable
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
return segment;
}