Skip to content

Commit

Permalink
Bugfix/#286 closed channel exception (#393)
Browse files Browse the repository at this point in the history
* fixes #286 closed_channel_exception
reverts some resource-leak refactorings 5e8ba71 which closes the resources to eagerly

Co-authored-by: Johannes Visintini <[email protected]>
  • Loading branch information
rtroilo and joker234 authored Jul 5, 2021
1 parent 3c40ffc commit 411df89
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ public void loadTags() {
values = new ArrayList<>(kvp.valuesNumber);

valuesChannel.position(kvp.valuesOffset);
try(DataInputStream valueStream = new DataInputStream(Channels.newInputStream(valuesChannel));){
for (int j = 0; j < kvp.valuesNumber; j++) {
final VF vf = VF.read(valueStream);
values.add(vf.value);
}
DataInputStream valueStream = new DataInputStream(Channels.newInputStream(valuesChannel));
for (int j = 0; j < kvp.valuesNumber; j++) {
final VF vf = VF.read(valueStream);
values.add(vf.value);
}

handler.loadKeyValues(i, key, values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -104,29 +103,27 @@ private static StringToIdMappingImpl value2IdMapping(KeyValuePointer kvp, FileCh
final Int2IntAVLTreeMap valueHash2Cnt = new Int2IntAVLTreeMap();

channel.position(kvp.valuesOffset);
try(DataInputStream valueStream = new DataInputStream(new BufferedInputStream(Channels.newInputStream(channel),1024*1024));){
for (int j = 0; j < kvp.valuesNumber; j++) {
final VF vf = VF.read(valueStream);
final int hash = hashFunction.applyAsInt(vf.value);
valueHash2Cnt.addTo(hash, 1);
}
DataInputStream valueStream = new DataInputStream(new BufferedInputStream(Channels.newInputStream(channel),1024*1024));
for (int j = 0; j < kvp.valuesNumber; j++) {
final VF vf = VF.read(valueStream);
final int hash = hashFunction.applyAsInt(vf.value);
valueHash2Cnt.addTo(hash, 1);
}

final Int2IntMap uniqueValues = new Int2IntAVLTreeMap();
final Object2IntMap<String> notUniqueValues = new Object2IntAVLTreeMap<String>();
long estimatedSize = 0;
channel.position(kvp.valuesOffset);
try(DataInputStream valueStream = new DataInputStream(Channels.newInputStream(channel));){
for (int j = 0; j < kvp.valuesNumber; j++) {
final VF vf = VF.read(valueStream);
final int hash = hashFunction.applyAsInt(vf.value);
if (valueHash2Cnt.get(hash) > 1) {
notUniqueValues.put(vf.value, j);
estimatedSize += SizeEstimator.estimatedSizeOfAVLEntryValue(kvp.key)+4;
} else {
uniqueValues.put(hash, j);
estimatedSize += SizeEstimator.avlTreeEntry() + 8;
}
valueStream = new DataInputStream(Channels.newInputStream(channel));
for (int j = 0; j < kvp.valuesNumber; j++) {
final VF vf = VF.read(valueStream);
final int hash = hashFunction.applyAsInt(vf.value);
if (valueHash2Cnt.get(hash) > 1) {
notUniqueValues.put(vf.value, j);
estimatedSize += SizeEstimator.estimatedSizeOfAVLEntryValue(kvp.key)+4;
} else {
uniqueValues.put(hash, j);
estimatedSize += SizeEstimator.avlTreeEntry() + 8;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,59 +142,56 @@ private static boolean seekStartPos(RandomAccessFile raf, long pos, long limit)
public static long seekBlobHeaderStart(final InputStream is, final long limit) throws IOException {
long totalBytesRead = 0;
final byte[] pushBackBytes = new byte[BLOBHEADER_SIZE_BYTES + SIGNATURE_SIZE_BYTES];
try(PushbackInputStream pushBackStream = new PushbackInputStream(is, pushBackBytes.length);) {
for (int i = 0; i < BLOBHEADER_SIZE_BYTES; i++) {
pushBackBytes[i] = (byte) pushBackStream.read();
totalBytesRead++;
}

int nextByte = pushBackStream.read();
PushbackInputStream pushBackStream = new PushbackInputStream(is, pushBackBytes.length);
for (int i = 0; i < BLOBHEADER_SIZE_BYTES; i++) {
pushBackBytes[i] = (byte) pushBackStream.read();
totalBytesRead++;
int val = 0;
while (nextByte != -1) {
if ((val < SIGNATURE_OSMDATA.length && SIGNATURE_OSMDATA[val] == nextByte)
|| (val < SIGNATURE_OSMHEADER.length && SIGNATURE_OSMHEADER[val] == nextByte)) {

}

int nextByte = pushBackStream.read();
totalBytesRead++;
int val = 0;
while (nextByte != -1) {
if ((val < SIGNATURE_OSMDATA.length && SIGNATURE_OSMDATA[val] == nextByte)
|| (val < SIGNATURE_OSMHEADER.length && SIGNATURE_OSMHEADER[val] == nextByte)) {

pushBackBytes[BLOBHEADER_SIZE_BYTES + val] = (byte) nextByte;

if (val == SIGNATURE_OSMDATA.length - 1 || val == SIGNATURE_OSMHEADER.length - 1) {
// Full OSMHeader\Data SIGNATURE is found.
pushBackStream.unread(pushBackBytes, 0, BLOBHEADER_SIZE_BYTES + val + 1);
totalBytesRead -= BLOBHEADER_SIZE_BYTES + val + 1;
return totalBytesRead;
}
val++;
} else if (val != 0) {
// break
if (limit > 0 && totalBytesRead > limit) {
return -1;
}

val = 0;
if (SIGNATURE_OSMDATA[val] == nextByte || SIGNATURE_OSMHEADER[val] == nextByte) {
pushBackBytes[BLOBHEADER_SIZE_BYTES + val] = (byte) nextByte;

if (val == SIGNATURE_OSMDATA.length - 1 || val == SIGNATURE_OSMHEADER.length - 1) {
// Full OSMHeader\Data SIGNATURE is found.
pushBackStream.unread(pushBackBytes, 0, BLOBHEADER_SIZE_BYTES + val + 1);
totalBytesRead -= BLOBHEADER_SIZE_BYTES + val + 1;
return totalBytesRead;
}
val++;
} else if (val != 0) {
// break
if (limit > 0 && totalBytesRead > limit) {
return -1;
}

val = 0;
if (SIGNATURE_OSMDATA[val] == nextByte || SIGNATURE_OSMHEADER[val] == nextByte) {
pushBackBytes[BLOBHEADER_SIZE_BYTES + val] = (byte) nextByte;
val++;
} else {
for (int i = 0; i < 3; i++) {
pushBackBytes[i] = pushBackBytes[i + 1];
}
pushBackBytes[3] = (byte) nextByte;
}

} else {
for (int i = 0; i < 3; i++) {
pushBackBytes[i] = pushBackBytes[i + 1];
}
pushBackBytes[3] = (byte) nextByte;
}

nextByte = pushBackStream.read();
totalBytesRead++;

} else {
for (int i = 0; i < 3; i++) {
pushBackBytes[i] = pushBackBytes[i + 1];
}
pushBackBytes[3] = (byte) nextByte;
}

// we reach the end of stream and found no signature
return -1;
nextByte = pushBackStream.read();
totalBytesRead++;
}
// we reach the end of stream and found no signature
return -1;
}

private static BiFunction<PbfChannel, Emitter<PbfBlob>, PbfChannel> readBlobFromChannel() {
Expand Down

0 comments on commit 411df89

Please sign in to comment.