Fix Appenderator.push() to commit the metadata of all segments (#5730)

* Remove persist from Appenderator

* fix javadoc
This commit is contained in:
Jihoon Son 2018-05-02 13:17:54 -07:00 committed by Slim Bouguerra
parent d4311b4a5a
commit 2c8296f94d
4 changed files with 29 additions and 55 deletions

View File

@ -37,9 +37,9 @@ import java.util.List;
* both of those. It can also push data to deep storage. But, it does not decide which segments data should go into.
* It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself!
* <p>
* You can provide a {@link Committer} or a Supplier of one when you call one of the methods that adds, persists, or
* pushes data. The Committer should represent all data you have given to the Appenderator so far. This Committer will
* be used when that data has been persisted to disk.
* You can provide a {@link Committer} or a Supplier of one when you call one of the methods that {@link #add},
* {@link #persistAll}, or {@link #push}. The Committer should represent all data you have given to the Appenderator so
* far. This Committer will be used when that data has been persisted to disk.
*/
public interface Appenderator extends QuerySegmentWalker, Closeable
{
@ -74,8 +74,9 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param identifier the segment into which this row should be added
* @param row the row to add
@ -129,8 +130,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
* {@link #add}, {@link #clear}, {@link #persistAll}, and {@link #push} methods should all be called from the same
* thread to keep the metadata committed by Committer in sync.
*/
void clear() throws InterruptedException;
@ -148,50 +149,31 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
*/
ListenableFuture<?> drop(SegmentIdentifier identifier);
/**
* Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only
* somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously with the call to
* persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with
* the data persisted to disk.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
*
* @param identifiers segment identifiers to be persisted
* @param committer a committer associated with all data that has been added to segments of the given identifiers so
* far
*
* @return future that resolves when all pending data to segments of the identifiers has been persisted, contains
* commit metadata for this persist
*/
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer);
/**
* Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the
* machine's local disk. The Committer will be made synchronously with the call to persistAll, but will actually
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param committer a committer associated with all data that has been added so far
*
* @return future that resolves when all pending data has been persisted, contains commit metadata for this persist
*/
default ListenableFuture<Object> persistAll(@Nullable Committer committer)
{
return persist(getSegments(), committer);
}
ListenableFuture<Object> persistAll(@Nullable Committer committer);
/**
* Merge and push particular segments to deep storage. This will trigger an implicit
* {@link #persist(Collection, Committer)} using the provided Committer.
* {@link #persistAll(Committer)} using the provided Committer.
* <p>
* After this method is called, you cannot add new data to any segments that were previously under construction.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param identifiers list of segments to push
* @param committer a committer associated with all data that has been added so far

View File

@ -392,12 +392,12 @@ public class AppenderatorImpl implements Appenderator
}
@Override
public ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer)
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
for (SegmentIdentifier identifier : identifiers) {
for (SegmentIdentifier identifier : sinks.keySet()) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new ISE("No sink for identifier: %s", identifier);
@ -499,13 +499,6 @@ public class AppenderatorImpl implements Appenderator
return future;
}
@Override
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
// Submit persistAll task to the persistExecutor
return persist(sinks.keySet(), committer);
}
@Override
public ListenableFuture<SegmentsAndMetadata> push(
final Collection<SegmentIdentifier> identifiers,
@ -524,7 +517,9 @@ public class AppenderatorImpl implements Appenderator
}
return Futures.transform(
persist(identifiers, committer),
// We should always persist all segments regardless of the input because metadata should be committed for all
// segments.
persistAll(committer),
(Function<Object, SegmentsAndMetadata>) commitMetadata -> {
final List<DataSegment> dataSegments = Lists.newArrayList();

View File

@ -190,10 +190,9 @@ public class AppenderatorTest
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
}
}
@ -237,12 +236,12 @@ public class AppenderatorTest
Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier, false);
Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
}
}
@Test
public void testRestoreFromDisk() throws Exception
{

View File

@ -454,15 +454,13 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
}
@Override
public ListenableFuture<Object> persist(
Collection<SegmentIdentifier> identifiers, Committer committer
)
public ListenableFuture<Object> persistAll(Committer committer)
{
if (persistEnabled) {
// do nothing
return Futures.immediateFuture(committer.getMetadata());
} else {
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", identifiers));
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", rows.keySet()));
}
}
@ -488,7 +486,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
)
.collect(Collectors.toList());
return Futures.transform(
persist(identifiers, committer),
persistAll(committer),
(Function<Object, SegmentsAndMetadata>) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata)
);
} else {