Fix the reported number of bytes read (elastic/elasticsearch#544)
Original commit: elastic/x-pack-elasticsearch@a98902f309
This commit is contained in:
parent
b386ed33a1
commit
7efdbd9320
|
@ -31,45 +31,30 @@ public class CountingInputStream extends FilterInputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We don't care if the count is one byte out
|
* Report 1 byte read
|
||||||
* because we don't check for the case where read
|
|
||||||
* returns -1.
|
|
||||||
* <p>
|
|
||||||
* One of the buffered read(..) methods is more likely to
|
|
||||||
* be called anyway.
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int read() throws IOException {
|
public int read() throws IOException {
|
||||||
statusReporter.reportBytesRead(1);
|
int read = in.read();
|
||||||
|
statusReporter.reportBytesRead(read < 0 ? 0 : 1);
|
||||||
|
|
||||||
return in.read();
|
return read;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Don't bother checking for the special case where
|
|
||||||
* the stream is closed/finished and read returns -1.
|
|
||||||
* Our count will be 1 byte out.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int read(byte[] b) throws IOException {
|
public int read(byte[] b) throws IOException {
|
||||||
int read = in.read(b);
|
int read = in.read(b);
|
||||||
|
|
||||||
statusReporter.reportBytesRead(read);
|
statusReporter.reportBytesRead(read < 0 ? 0 : read);
|
||||||
|
|
||||||
return read;
|
return read;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Don't bother checking for the special case where
|
|
||||||
* the stream is closed/finished and read returns -1.
|
|
||||||
* Our count will be 1 byte out.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int read(byte[] b, int off, int len) throws IOException {
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
int read = in.read(b, off, len);
|
int read = in.read(b, off, len);
|
||||||
|
|
||||||
statusReporter.reportBytesRead(read);
|
statusReporter.reportBytesRead(read < 0 ? 0 : read);
|
||||||
return read;
|
return read;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,9 +29,7 @@ public class CountingInputStreamTests extends ESTestCase {
|
||||||
|
|
||||||
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
|
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
|
||||||
while (counting.read() >= 0) {}
|
while (counting.read() >= 0) {}
|
||||||
// an extra byte is read because we don't check the return
|
Assert.assertEquals(TEXT.length(), usageReporter.getBytesReadSinceLastReport());
|
||||||
// value of the read() method
|
|
||||||
Assert.assertEquals(TEXT.length() + 1, usageReporter.getBytesReadSinceLastReport());
|
|
||||||
|
|
||||||
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
|
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
|
||||||
}
|
}
|
||||||
|
@ -48,10 +46,7 @@ public class CountingInputStreamTests extends ESTestCase {
|
||||||
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
|
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
|
||||||
byte buf[] = new byte[256];
|
byte buf[] = new byte[256];
|
||||||
while (counting.read(buf) >= 0) {}
|
while (counting.read(buf) >= 0) {}
|
||||||
// one less byte is reported because we don't check
|
Assert.assertEquals(TEXT.length(), usageReporter.getBytesReadSinceLastReport());
|
||||||
// the return value of the read() method
|
|
||||||
Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport());
|
|
||||||
|
|
||||||
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
|
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,10 +62,7 @@ public class CountingInputStreamTests extends ESTestCase {
|
||||||
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
|
try (CountingInputStream counting = new CountingInputStream(source, statusReporter)) {
|
||||||
byte buf[] = new byte[8];
|
byte buf[] = new byte[8];
|
||||||
while (counting.read(buf, 0, 8) >= 0) {}
|
while (counting.read(buf, 0, 8) >= 0) {}
|
||||||
// an extra byte is read because we don't check the return
|
Assert.assertEquals(TEXT.length(), usageReporter.getBytesReadSinceLastReport());
|
||||||
// value of the read() method
|
|
||||||
Assert.assertEquals(TEXT.length() - 1, usageReporter.getBytesReadSinceLastReport());
|
|
||||||
|
|
||||||
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
|
Assert.assertEquals(usageReporter.getBytesReadSinceLastReport(), statusReporter.getBytesRead());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ setup:
|
||||||
|
|
||||||
- match: { processed_record_count: 2 }
|
- match: { processed_record_count: 2 }
|
||||||
- match: { processed_field_count: 4}
|
- match: { processed_field_count: 4}
|
||||||
- match: { input_bytes: 177 }
|
- match: { input_bytes: 178 }
|
||||||
- match: { input_field_count: 6 }
|
- match: { input_field_count: 6 }
|
||||||
- match: { invalid_date_count: 0 }
|
- match: { invalid_date_count: 0 }
|
||||||
- match: { missing_field_count: 0 }
|
- match: { missing_field_count: 0 }
|
||||||
|
@ -63,7 +63,7 @@ setup:
|
||||||
|
|
||||||
- match: { _source.processed_record_count: 2 }
|
- match: { _source.processed_record_count: 2 }
|
||||||
- match: { _source.processed_field_count: 4}
|
- match: { _source.processed_field_count: 4}
|
||||||
- match: { _source.input_bytes: 177 }
|
- match: { _source.input_bytes: 178 }
|
||||||
- match: { _source.input_field_count: 6 }
|
- match: { _source.input_field_count: 6 }
|
||||||
- match: { _source.invalid_date_count: 0 }
|
- match: { _source.invalid_date_count: 0 }
|
||||||
- match: { _source.missing_field_count: 0 }
|
- match: { _source.missing_field_count: 0 }
|
||||||
|
|
Loading…
Reference in New Issue