Improve error message for revoked locks (#7035)

* Improve error message for revoked locks

* fix test

* fix test

* fix test

* fix toString
This commit is contained in:
Jihoon Son 2019-02-13 11:22:48 -08:00 committed by GitHub
parent b1c4a5de0d
commit 1701fbcad3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 154 additions and 38 deletions

View File

@ -118,7 +118,12 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
endMetadata
)
)
.onInvalidLocks(SegmentPublishResult::fail)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
+ " Please check the overlord log for details."
)
)
.build()
);
}

View File

@ -101,7 +101,7 @@ public class SegmentTransactionalInsertActionTest
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT1), true), result1);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1);
SegmentPublishResult result2 = new SegmentTransactionalInsertAction(
ImmutableSet.of(SEGMENT2),
@ -111,7 +111,7 @@ public class SegmentTransactionalInsertActionTest
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT2), true), result2);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2);
Assert.assertEquals(
ImmutableSet.of(SEGMENT1, SEGMENT2),
@ -143,7 +143,7 @@ public class SegmentTransactionalInsertActionTest
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result);
}
@Test
@ -157,6 +157,6 @@ public class SegmentTransactionalInsertActionTest
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT3), true), result);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT3)), result);
}
}

View File

@ -1574,10 +1574,7 @@ public class IndexTaskTest
}
if (taskAction instanceof SegmentTransactionalInsertAction) {
return (RetType) new SegmentPublishResult(
((SegmentTransactionalInsertAction) taskAction).getSegments(),
true
);
return (RetType) SegmentPublishResult.ok(((SegmentTransactionalInsertAction) taskAction).getSegments());
}
if (taskAction instanceof SegmentAllocateAction) {

View File

@ -116,7 +116,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
)
{
// Don't actually compare metadata, just do it!
return new SegmentPublishResult(announceHistoricalSegments(segments), true);
return SegmentPublishResult.ok(announceHistoricalSegments(segments));
}
@Override

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;
@ -42,20 +43,29 @@ public class SegmentPublishResult
{
private final Set<DataSegment> segments;
private final boolean success;
@Nullable
private final String errorMsg;
public static SegmentPublishResult fail()
public static SegmentPublishResult ok(Set<DataSegment> segments)
{
return new SegmentPublishResult(ImmutableSet.of(), false);
return new SegmentPublishResult(segments, true, null);
}
public static SegmentPublishResult fail(String errorMsg)
{
return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
}
@JsonCreator
public SegmentPublishResult(
private SegmentPublishResult(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("success") boolean success
@JsonProperty("success") boolean success,
@JsonProperty("errorMsg") @Nullable String errorMsg
)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.success = success;
this.errorMsg = errorMsg;
if (!success) {
Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes");
@ -74,6 +84,13 @@ public class SegmentPublishResult
return success;
}
@JsonProperty
@Nullable
public String getErrorMsg()
{
return errorMsg;
}
@Override
public boolean equals(Object o)
{
@ -85,13 +102,14 @@ public class SegmentPublishResult
}
SegmentPublishResult that = (SegmentPublishResult) o;
return success == that.success &&
Objects.equals(segments, that.segments);
Objects.equals(segments, that.segments) &&
Objects.equals(errorMsg, that.errorMsg);
}
@Override
public int hashCode()
{
return Objects.hash(segments, success);
return Objects.hash(segments, success, errorMsg);
}
@Override
@ -100,6 +118,7 @@ public class SegmentPublishResult
return "SegmentPublishResult{" +
"segments=" + segments +
", success=" + success +
", errorMsg='" + errorMsg + '\'' +
'}';
}
}

View File

@ -324,7 +324,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true);
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}
},
3,
@ -333,7 +333,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
catch (CallbackFailedException e) {
if (definitelyNotUpdated.get()) {
return SegmentPublishResult.fail();
return SegmentPublishResult.fail(e.getMessage());
} else {
// Must throw exception if we are not sure if we updated or not.
throw e;

View File

@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@ -553,16 +554,26 @@ public abstract class BaseAppenderatorDriver implements Closeable
try {
final Object metadata = segmentsAndMetadata.getCommitMetadata();
final boolean published = publisher.publishSegments(
final SegmentPublishResult publishResult = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
).isSuccess();
);
if (published) {
if (publishResult.isSuccess()) {
log.info("Published segments.");
} else {
log.info("Transaction failure while publishing segments, removing them from deep storage "
+ "and checking if someone else beat us to publishing.");
if (publishResult.getErrorMsg() == null) {
log.warn(
"Transaction failure while publishing segments. Please check the overlord log."
+ " Removing them from deep storage and checking if someone else beat us to publishing."
);
} else {
log.warn(
"Transaction failure while publishing segments because of [%s]. Please check the overlord log."
+ " Removing them from deep storage and checking if someone else beat us to publishing.",
publishResult.getErrorMsg()
);
}
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
@ -576,7 +587,11 @@ public abstract class BaseAppenderatorDriver implements Closeable
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info("Our segments really do exist, awaiting handoff.");
} else {
throw new ISE("Failed to publish segments.");
if (publishResult.getErrorMsg() != null) {
throw new ISE("Failed to publish segments because of [%s].", publishResult.getErrorMsg());
} else {
throw new ISE("Failed to publish segments.");
}
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class SegmentPublishResultTest
{
private final ObjectMapper objectMapper = new DefaultObjectMapper()
.setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT));
@Test
public void testSerdeOkResult() throws IOException
{
final SegmentPublishResult original = SegmentPublishResult.ok(
ImmutableSet.of(
segment(Intervals.of("2018/2019")),
segment(Intervals.of("2019/2020"))
)
);
final String json = objectMapper.writeValueAsString(original);
final SegmentPublishResult fromJson = objectMapper.readValue(json, SegmentPublishResult.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testSerdeFailResult() throws IOException
{
final SegmentPublishResult original = SegmentPublishResult.fail("test");
final String json = objectMapper.writeValueAsString(original);
final SegmentPublishResult fromJson = objectMapper.readValue(json, SegmentPublishResult.class);
Assert.assertEquals(original, fromJson);
}
private static DataSegment segment(Interval interval)
{
return new DataSegment(
"ds",
interval,
"version",
null,
null,
null,
null,
9,
10L
);
}
}

View File

@ -304,7 +304,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8),
@ -322,7 +322,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8),
@ -378,7 +378,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8),
@ -399,7 +399,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8),
@ -429,7 +429,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1);
// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
@ -443,14 +443,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2);
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
@ -464,14 +464,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2);
// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());

View File

@ -195,6 +195,6 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true);
return (segments, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
}
}

View File

@ -239,7 +239,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage("Failed to publish segments.");
expectedException.expectMessage("Failed to publish segments because of [test].");
testFailDuringPublishInternal(false);
}

View File

@ -361,16 +361,17 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
static TransactionalSegmentPublisher makeOkPublisher()
{
return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true);
return (segments, commitMetadata) -> SegmentPublishResult.ok(Collections.emptySet());
}
static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
{
return (segments, commitMetadata) -> {
final RuntimeException exception = new RuntimeException("test");
if (failWithException) {
throw new RuntimeException("test");
throw exception;
}
return SegmentPublishResult.fail();
return SegmentPublishResult.fail(exception.getMessage());
};
}