Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket timeout while opening Blueprint FTP links #40

Open
iromeo opened this issue Jun 16, 2018 · 7 comments
Open

Socket timeout while opening Blueprint FTP links #40

iromeo opened this issue Jun 16, 2018 · 7 comments
Assignees

Comments

@iromeo
Copy link
Contributor

iromeo commented Jun 16, 2018

Socket timeout while opening Blueprint FTP links.
E.g. try ftp://ftp.ebi.ac.uk/pub/databases/blueprint/blueprint_progenitor_methylomes/rna/RNA_D1_CLP_100.bw

Not clear why it happening. But the problem is in htsjdk. They implement it's own FTP client, which hangs while logging in to blueprint FTP server ( samtools/htsjdk#797).
Apache FTPClient works ok.

@iromeo
Copy link
Contributor Author

iromeo commented Jun 16, 2018

As a workaround we could rewrite SeekableFTPStreamHelper using apache FTP client. E.g. working prof of concept is:

/*
 * The MIT License
 *
 * Copyright (c) 2013 The Broad Institute
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

package org.jetbrains.bio.util

import htsjdk.samtools.seekablestream.*
import htsjdk.samtools.seekablestream.SeekableStreamFactory
import htsjdk.samtools.util.IOUtil
import org.apache.commons.net.ftp.FTPClient
import org.apache.commons.net.ftp.FTPClientConfig
import java.io.EOFException
import java.io.File
import java.io.IOException
import java.net.URL
import java.nio.channels.SeekableByteChannel
import java.util.function.Function


/**
 * @author Roman.Chernyatchik
 */
object SeekableStreamFactory {
    fun getStreamFor(path: String,
                     wrapper: Function<SeekableByteChannel, SeekableByteChannel>? = null): SeekableStream {

        if (path.startsWith("http:") || path.startsWith("https:")) {
            val url = URL(path)
            return SeekableHTTPStream(url)
        } else return if (path.startsWith("ftp:")) {
            SeekableFTPStream(URL(path))
        } else if (path.startsWith("file:")) {
            SeekableFileStream(File(URL(path).path))
        } else if (IOUtil.hasScheme(path)) {
            SeekablePathStream(IOUtil.getPath(path), wrapper)
        } else {
            SeekableFileStream(File(path))
        }
    }
}

class SeekableFTPStream @Throws(IOException::class)
@JvmOverloads constructor(url: URL, userPasswordInput: UserPasswordInput? = null) : SeekableStream() {

    internal var helper: SeekableFTPStreamHelper

    init {
        helper = SeekableFTPStreamHelper(url, userPasswordInput)
    }

    override fun seek(position: Long) {
        helper.seek(position)
    }

    override fun position(): Long {
        return helper.position()
    }

    @Throws(IOException::class)
    override fun eof(): Boolean {
        return helper.eof()
    }

    override fun getSource(): String? {
        return null //TODO
    }

    override fun length(): Long {
        return helper.length()
    }


    @Throws(IOException::class)
    override fun skip(n: Long): Long {
        return helper.skip(n)
    }


    @Throws(IOException::class)
    override fun read(buffer: ByteArray, offset: Int, len: Int): Int {
        return helper.read(buffer, offset, len)
    }


    @Throws(IOException::class)
    override fun close() {
        helper.close()
    }

    @Throws(IOException::class)
    override fun read(): Int {
        return helper.read()
    }

    companion object {

        //    private static final String EXPECTED = "Apache Software Foundation";
        private val EXPECTED1 = "\u00cf\u00ac\u00c9\u0075\u0043\u00d4\u00d5\u0079"
        private val EXPECTED2 = "\u00e4\u006c\u0077\u000c\u0016\u00f1\u0030\u008f"
        @Throws(IOException::class)
        @JvmStatic
        fun main(args: Array<String>) {
            //    	String testURL = (args.length < 1) ? "ftp://apache.cs.utah.edu/apache.org/HEADER.html" : args[0];
            val testURL = if (args.size < 1) "ftp://hgdownload.cse.ucsc.edu/goldenPath/panTro3/vsHg19/panTro3.hg19.all.chain.gz" else args[0]
            val startPosition = if (args.size < 2) 0x0b66c78L else java.lang.Long.parseLong(args[1])
            val len = if (args.size < 3) 8 else Integer.parseInt(args[2])
            val skipLen = if (args.size < 4) 0x18 else Integer.parseInt(args[3])
            val s = SeekableStreamFactory.getInstance().getStreamFor(testURL)
            val buffer = ByteArray(len)
            s.seek(startPosition)
            s.read(buffer, 0, len)
            if (s.position() != startPosition + len && s.position() != s.length()) {
                println("1) updated position is incorrect")
            }
            val data = String(buffer)
            println("1) read:$data")
            if (args.size == 0) {
                println("1) expected:$EXPECTED1")
                println("1) values do" + (if (EXPECTED1 == data) "" else " not") + " match")
            }
            s.skip(skipLen.toLong())
            s.read(buffer, 0, len)
            if (s.position() != startPosition + (2 * len).toLong() + skipLen.toLong() && s.position() != s.length()) {
                println("2) updated position is incorrect")
            }
            val data2 = String(buffer)
            println("2) read:$data2")
            if (args.size == 0) {
                println("2) expected:$EXPECTED2")
                println("2) values do" + (if (EXPECTED2 == data2) "" else " not") + " match")
            }
        }
    }
}

class SeekableFTPStreamHelper @Throws(IOException::class)
internal constructor(url: URL, private val userPasswordInput: UserPasswordInput?) {

    private var position: Long = 0
    private var contentLength: Long = -1
    private val host: String
    private val path: String
    private var userInfo: String?

    internal var ftp: FTPClient?

    init {
        this.userInfo = url.userInfo
        this.host = url.host
        this.path = url.path

        val aFtp = ftpConnect()
        ftp = aFtp


        val files = ftp!!.listFiles(path)
        if (files.size == 1 && files[0].isFile) {
            contentLength = files[0].size
        }

        val reply = ftp!!.retr(path)
//        aFtp.sendCommand("SIZE", path)
//        val reply = aFtp.replyString
//        if(FTPReply.isPositiveCompletion(aFtp.replyCode)) {
//            contentLength = java.lang.Long.parseLong(reply[])
//        }

    }

    private fun ftpConnect(): FTPClient {
        val ftp = FTPClient()
        ftp.configure(FTPClientConfig())

        // TODO: check replay
        ftp.connect(host)

        var user = "anonymous"
        var password = "[email protected]"

        //        if (userInfo == null) run { userInfo = FTPUtils.userCredentials.get(host) }
        //        if (userInfo != null) {
        //            val tmp = userInfo.split(":".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
        //            user = tmp[0]
        //            if (tmp.size > 1) {
        //                password = tmp[1]
        //            }
        //        }

        //TODO
        ftp.login(user, password)

        ftp.setFileType(org.apache.commons.net.ftp.FTP.BINARY_FILE_TYPE)


        val reply = ftp!!.retr(path)
        return ftp
    }

    fun seek(position: Long) {
        this.position = position
    }

    fun position(): Long {
        return position
    }

    @Throws(IOException::class)
    fun eof(): Boolean {
        return false
    }

    fun length(): Long {
        return contentLength
    }


    @Throws(IOException::class)
    fun skip(n: Long): Long {
        position += n
        if (ftp != null) {
            ftp!!.restartOffset = position
        }
        return n
    }

    @Throws(IOException::class)
    fun read(buffer: ByteArray, offset: Int, len: Int): Int {

        if (ftp == null) {
            ftp = ftpConnect()
        }

        if (offset < 0 || len < 0 || offset + len > buffer.size) {
            throw IndexOutOfBoundsException()
        }

        if (len == 0) {
            return 0
        }

        var n = 0
        try {

            var reply = ftp!!.pasv()

            // If we are positioned at or beyond the EOF return -1
            if (contentLength >= 0 && position >= contentLength) {
                return -1
            }

            if (position > 0) ftp!!.restartOffset = position

            //reply = ftp!!.retr(path)

//            val `is` = ftp!!.dataStream
            val `is` = ftp!!.retrieveFileStream(path)

            while (n < len) {
                val count = `is`.read(buffer, offset + n, len - n)
                if (count < 0) {
                    return if (n == 0) {
                        -1
                    } else {
                        break
                    }
                }
                n += count
            }

            position += n.toLong()

            return n
        } catch (e: EOFException) {
            if (n < 0) {
                return -1
            } else {
                position += n.toLong()
                return n
            }
        } finally {
            // ALWAYS close ftp connection,  this is more robust than trying to resue them,
            // and we don't want open connections hanging about
            ftp!!.disconnect()
            ftp = null
        }
    }


    @Throws(IOException::class)
    private fun reconnect() {
        if (ftp != null) {
            ftp!!.disconnect()
        }
        ftp = ftpConnect()
    }


    @Throws(IOException::class)
    fun close() {
        if (ftp != null) {
            ftp!!.disconnect()
            ftp = null
        }
    }


    @Throws(IOException::class)
    fun read(): Int {
        throw UnsupportedOperationException("read() is not supported on SeekableHTTPStream.  Must read in blocks.")
    }

}

@iromeo
Copy link
Contributor Author

iromeo commented Jun 16, 2018

Also see FTP client can't handle URL encoded file names samtools/htsjdk#464

@iromeo iromeo self-assigned this Jun 19, 2018
@iromeo
Copy link
Contributor Author

iromeo commented Jun 22, 2018

By the way HTTP links for BluePrint ftp works ok, e.g. http://ftp.ebi.ac.uk/pub/databases/blueprint/data/homo_sapiens/GRCh38/cord_blood/C005PS/CD14-positive_CD16-negative_classical_monocyte/Bisulfite-Seq/CNAG/C005PS51.CPG_methylation_calls.bs_call.GRCh38.20160531.bw

@olegs olegs transferred this issue from another repository Mar 22, 2019
@dievsky
Copy link

dievsky commented Mar 25, 2021

Is this issue still relevant?

@iromeo
Copy link
Contributor Author

iromeo commented Mar 25, 2021

@dievsky Yes, still doesn't work for me. And does it work on your machine?

@dievsky
Copy link

dievsky commented Mar 25, 2021

It doesn't work, but not because of the socket timeout. The exception that I get is

java.lang.UnsupportedOperationException: read() is not supported on SeekableHTTPStream.  Must read in blocks.

when trying to "read one byte from url to be 100% sure". This is from URIExtensions.kt:108.

@dievsky
Copy link

dievsky commented Mar 25, 2021

(I just wanted to check up on an issue that has been dormant for almost 3 years.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants