Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question/Feature Request: download_to_byte_array ? #289

Open
yxiang92128 opened this issue Sep 12, 2019 · 32 comments
Open

Question/Feature Request: download_to_byte_array ? #289

yxiang92128 opened this issue Sep 12, 2019 · 32 comments
Assignees
Labels

Comments

@yxiang92128
Copy link

Hi,

Currently there are only download_to_stream and download_to_file APIs And the download_to_stream allocates memory on heap during the download and thus increases the footprint of our application per thread. We have preallocated buffers (uint8 byte array) that we prefer to use to receive the download bytes and we wonder if there is a method we can rely upon to directly fetch the block blob contents into that buffer if we pass the pointer to the SDK. If not, is there a workaround that you could propose?

Thanks again,

Yang

@Jinming-Hu
Copy link
Member

Hi, currently we don't support this feature, and it's mainly because of limitation from cpprestsdk. In cpprestsdk, buffers are managed with std::shared_ptr to better maintain their lifetimes. User-provided buffer is passed by value to cpprestsdk so there will be a copy.

After some research into cpprestsdk source code, I came up with a workaround. We can create a derived class which accepts reference instead of copy as parameter. As long as we make sure the container lives throughout the HTTP request/response, everything should work just fine.

Here is a demonstration. You may want to make changes to fit your needs.

#include <pplx/pplxtasks.h>
#include <cpprest/streams.h>

template<typename _CollectionType>
class basic_managed_external_container_buffer : public Concurrency::streams::details::streambuf_state_manager<typename _CollectionType::value_type>
{
public:
    typedef typename _CollectionType::value_type _CharType;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::traits traits;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::int_type int_type;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::pos_type pos_type;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::off_type off_type;

    _CollectionType& collection() { return m_data; }

    virtual ~basic_managed_external_container_buffer()
    {
        // Invoke the synchronous versions since we need to
        // purge the request queue before deleting the buffer
        this->_close_read();
        this->_close_write();
    }

protected:
    virtual bool can_seek() const { return this->is_open(); }

    virtual bool has_size() const { return this->is_open(); }

    virtual utility::size64_t size() const { return utility::size64_t(m_data.size()); }

    virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const { return 0; }

    virtual void set_buffer_size(size_t, std::ios_base::openmode = std::ios_base::in) { return; }

    virtual size_t in_avail() const
    {
        // See the comment in seek around the restriction that we do not allow read head to
        // seek beyond the current write_end.
        _ASSERTE(m_current_position <= m_data.size());

        msl::safeint3::SafeInt<size_t> readhead(m_current_position);
        msl::safeint3::SafeInt<size_t> writeend(m_data.size());
        return (size_t)(writeend - readhead);
    }

    virtual pplx::task<bool> _sync() { return pplx::task_from_result(true); }

    virtual pplx::task<int_type> _putc(_CharType ch)
    {
        int_type retVal = (this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof();
        return pplx::task_from_result<int_type>(retVal);
    }

    virtual pplx::task<size_t> _putn(const _CharType* ptr, size_t count)
    {
        return pplx::task_from_result<size_t>(this->write(ptr, count));
    }

    _CharType* _alloc(size_t count)
    {
        if (!this->can_write()) return nullptr;

        // Allocate space
        resize_for_write(m_current_position + count);

        // Let the caller copy the data
        return (_CharType*)&m_data[m_current_position];
    }

    void _commit(size_t actual)
    {
        // Update the write position and satisfy any pending reads
        update_current_position(m_current_position + actual);
    }

    virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
    {
        ptr = nullptr;
        count = 0;

        if (!this->can_read()) return false;

        count = in_avail();

        if (count > 0)
        {
            ptr = (_CharType*)&m_data[m_current_position];
            return true;
        }
        else
        {
            // Can only be open for read OR write, not both. If there is no data then
            // we have reached the end of the stream so indicate such with true.
            return true;
        }
    }

    virtual void release(_Out_writes_opt_(count) _CharType* ptr, _In_ size_t count)
    {
        if (ptr != nullptr) update_current_position(m_current_position + count);
    }

    virtual pplx::task<size_t> _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        return pplx::task_from_result(this->read(ptr, count));
    }

    size_t _sgetn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return this->read(ptr, count); }

    virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        return this->read(ptr, count, false);
    }

    virtual pplx::task<int_type> _bumpc() { return pplx::task_from_result(this->read_byte(true)); }

    virtual int_type _sbumpc() { return this->read_byte(true); }

    virtual pplx::task<int_type> _getc() { return pplx::task_from_result(this->read_byte(false)); }

    int_type _sgetc() { return this->read_byte(false); }

    virtual pplx::task<int_type> _nextc()
    {
        this->read_byte(true);
        return pplx::task_from_result(this->read_byte(false));
    }

    virtual pplx::task<int_type> _ungetc()
    {
        auto pos = seekoff(-1, std::ios_base::cur, std::ios_base::in);
        if (pos == (pos_type)traits::eof()) return pplx::task_from_result(traits::eof());
        return this->getc();
    }

    virtual pos_type getpos(std::ios_base::openmode mode) const
    {
        if (((mode & std::ios_base::in) && !this->can_read()) || ((mode & std::ios_base::out) && !this->can_write()))
            return static_cast<pos_type>(traits::eof());

        return static_cast<pos_type>(m_current_position);
    }

    virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode)
    {
        pos_type beg(0);

        // In order to support relative seeking from the end position we need to fix an end position.
        // Technically, there is no end for the stream buffer as new writes would just expand the buffer.
        // For now, we assume that the current write_end is the end of the buffer. We use this artificial
        // end to restrict the read head from seeking beyond what is available.

        pos_type end(m_data.size());

        if (position >= beg)
        {
            auto pos = static_cast<size_t>(position);

            // Read head
            if ((mode & std::ios_base::in) && this->can_read())
            {
                if (position <= end)
                {
                    // We do not allow reads to seek beyond the end or before the start position.
                    update_current_position(pos);
                    return static_cast<pos_type>(m_current_position);
                }
            }

            // Write head
            if ((mode & std::ios_base::out) && this->can_write())
            {
                // Allocate space
                resize_for_write(pos);

                // Nothing to really copy

                // Update write head and satisfy read requests if any
                update_current_position(pos);

                return static_cast<pos_type>(m_current_position);
            }
        }

        return static_cast<pos_type>(traits::eof());
    }

    virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
    {
        pos_type beg = 0;
        pos_type cur = static_cast<pos_type>(m_current_position);
        pos_type end = static_cast<pos_type>(m_data.size());

        switch (way)
        {
            case std::ios_base::beg: return seekpos(beg + offset, mode);

            case std::ios_base::cur: return seekpos(cur + offset, mode);

            case std::ios_base::end: return seekpos(end + offset, mode);

            default: return static_cast<pos_type>(traits::eof());
        }
    }

private:
    template<typename _CollectionType1>
    friend class managed_external_container_buffer;

    basic_managed_external_container_buffer(_CollectionType& data, std::ios_base::openmode mode)
        : Concurrency::streams::details::streambuf_state_manager<typename _CollectionType::value_type>(mode)
        , m_data(data)
        , m_current_position(0)
    {
        validate_mode(mode);
    }

    static void validate_mode(std::ios_base::openmode mode)
    {
        // Disallow simultaneous use of the stream buffer for writing and reading.
        if ((mode & std::ios_base::in) && (mode & std::ios_base::out))
            throw std::invalid_argument("this combination of modes on container stream not supported");
    }

    bool can_satisfy(size_t)
    {
        // We can always satisfy a read, at least partially, unless the
        // read position is at the very end of the buffer.
        return (in_avail() > 0);
    }

    int_type read_byte(bool advance = true)
    {
        _CharType value;
        auto read_size = this->read(&value, 1, advance);
        return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
    }

    size_t read(_Out_writes_(count) _CharType* ptr, _In_ size_t count, bool advance = true)
    {
        if (!can_satisfy(count)) return 0;

        msl::safeint3::SafeInt<size_t> request_size(count);
        msl::safeint3::SafeInt<size_t> read_size = request_size.Min(in_avail());

        size_t newPos = m_current_position + read_size;

        auto readBegin = std::begin(m_data) + m_current_position;
        auto readEnd = std::begin(m_data) + newPos;

#if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0
        // Avoid warning C4996: Use checked iterators under SECURE_SCL
        std::copy(readBegin, readEnd, stdext::checked_array_iterator<_CharType*>(ptr, count));
#else
        std::copy(readBegin, readEnd, ptr);
#endif // _WIN32

        if (advance)
        {
            update_current_position(newPos);
        }

        return (size_t)read_size;
    }

    size_t write(const _CharType* ptr, size_t count)
    {
        if (!this->can_write() || (count == 0)) return 0;

        auto newSize = m_current_position + count;

        // Allocate space
        resize_for_write(newSize);

        // Copy the data
        std::copy(ptr, ptr + count, std::begin(m_data) + m_current_position);

        // Update write head and satisfy pending reads if any
        update_current_position(newSize);

        return count;
    }

    void resize_for_write(size_t newPos)
    {
        // Resize the container if required
        if (newPos > m_data.size())
        {
            m_data.resize(newPos);
        }
    }

    void update_current_position(size_t newPos)
    {
        // The new write head
        m_current_position = newPos;
        _ASSERTE(m_current_position <= m_data.size());
    }

    // The actual data store
    _CollectionType& m_data;

    // Read/write head
    size_t m_current_position;
};

template<typename _CollectionType>
class managed_external_container_buffer : public Concurrency::streams::streambuf<typename _CollectionType::value_type>
{
public:
    typedef typename _CollectionType::value_type char_type;

    managed_external_container_buffer(_CollectionType& data, std::ios_base::openmode mode = std::ios_base::in)
        : Concurrency::streams::streambuf<typename _CollectionType::value_type>(
            std::shared_ptr<basic_managed_external_container_buffer<_CollectionType>>(
                new basic_managed_external_container_buffer<_CollectionType>(data, mode)))
    {}

    _CollectionType& collection() const
    {
        auto listBuf = static_cast<basic_managed_external_container_buffer<_CollectionType>*>(this->get_base().get());
        return listBuf->collection();
    }
};

use it like this:

std::vector<uint8_t> data;
data.resize(filesize);  // Do make sure it's big enough
blob.download_to_stream(managed_external_container_buffer<std::vector<uint8_t>>(data, std::ios_base::out).create_ostream());

The cpprestsdk will still annoyingly allocate/free some small buffers(1M~4M) repeatedly during download. I didn't further study it, but I found disabling parallel download and checksum validation eliminates that.

azure::storage::blob_request_options options;
options.set_use_transactional_md5(false);
options.set_parallelism_factor(1);
blob.download_to_stream(managed_external_container_buffer<std::vector<uint8_t>>(data, std::ios_base::out).create_ostream(), azure::storage::access_condition(), options, azure::storage::operation_context());

I use std::vector<uint8_t> as example, but it also works for raw array.

@yxiang92128
Copy link
Author

Thanks. We will give it a try.
@JinmingHu-MSFT

@yxiang92128
Copy link
Author

yxiang92128 commented Sep 19, 2019

@JinmingHu-MSFT
We have loads of questions as we are beginning full deployment of your SDK into products so please bear with me.
For the managed_external_container_buffer, if we are to pass in the raw array of char ie char[] or a pointer to a char array, it does not seem to compile as it expects a container type. Any work around you think you might have? We basically have a preallocated buffer of chars we want to use in the download_to_stream and we don't want the SDK to call malloc at all.

With std::iostream, we were able to do the following and we are hoping we could do something similar with azure sdk/cpp rest sdk:
std::stringstream* strm = new std::stringstream;
strm->rdbuf()->pubsetbuf((char *)buf,len); // sets the backing char sequence with a user provided array of char
std::shared_ptr<std::basic_iostream > strmp(strm);

Thanks for helping out.

Yang

@Jinming-Hu
Copy link
Member

@yxiang92128 Hi, I'm glad I can help. In the previous reply, by saying it also works for raw array, I didn't mean the code snippet works for raw array. You need to modify it slightly to fit your needs. If you don't know what to do, reply here and I can help you.

Did you try it with std::vector<uint8_t>? Does it work?

By the way, I don't think it's possible to completely avoid malloc. There would always be dozens of bytes allocation. But to avoid big allocation (like larger than 500K) is possible, I think.

@yxiang92128
Copy link
Author

@JinmingHu-MSFT
Any hint of how to modify your managed_external_container_buffer to support raw array would be a great help to us. Thanks.
std::vector<uint8_t> will not work for us. Our interface is a C program which only accommodate raw array of char.

@Jinming-Hu
Copy link
Member

Hi Yang, give this a try

#include <pplx/pplxtasks.h>
#include <cpprest/streams.h>

class basic_managed_external_raw_buffer : public Concurrency::streams::details::streambuf_state_manager<uint8_t>
{
public:
    typedef uint8_t _CharType;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::traits traits;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::int_type int_type;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::pos_type pos_type;
    typedef typename Concurrency::streams::details::basic_streambuf<_CharType>::off_type off_type;

    uint8_t* collection() { return m_data; }

    virtual ~basic_managed_external_raw_buffer()
    {
        // Invoke the synchronous versions since we need to
        // purge the request queue before deleting the buffer
        this->_close_read();
        this->_close_write();
    }

protected:
    virtual bool can_seek() const { return this->is_open(); }

    virtual bool has_size() const { return this->is_open(); }

    virtual utility::size64_t size() const { return utility::size64_t(m_size); }

    virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const { return 0; }

    virtual void set_buffer_size(size_t, std::ios_base::openmode = std::ios_base::in) { return; }

    virtual size_t in_avail() const
    {
        // See the comment in seek around the restriction that we do not allow read head to
        // seek beyond the current write_end.
        _ASSERTE(m_current_position <= m_size);

        msl::safeint3::SafeInt<size_t> readhead(m_current_position);
        msl::safeint3::SafeInt<size_t> writeend(m_size);
        return (size_t)(writeend - readhead);
    }

    virtual pplx::task<bool> _sync() { return pplx::task_from_result(true); }

    virtual pplx::task<int_type> _putc(_CharType ch)
    {
        int_type retVal = (this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof();
        return pplx::task_from_result<int_type>(retVal);
    }

    virtual pplx::task<size_t> _putn(const _CharType* ptr, size_t count)
    {
        return pplx::task_from_result<size_t>(this->write(ptr, count));
    }

    _CharType* _alloc(size_t count)
    {
        if (!this->can_write()) return nullptr;

        // Allocate space
        resize_for_write(m_current_position + count);

        // Let the caller copy the data
        return (_CharType*)&m_data[m_current_position];
    }

    void _commit(size_t actual)
    {
        // Update the write position and satisfy any pending reads
        update_current_position(m_current_position + actual);
    }

    virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
    {
        ptr = nullptr;
        count = 0;

        if (!this->can_read()) return false;

        count = in_avail();

        if (count > 0)
        {
            ptr = (_CharType*)&m_data[m_current_position];
            return true;
        }
        else
        {
            // Can only be open for read OR write, not both. If there is no data then
            // we have reached the end of the stream so indicate such with true.
            return true;
        }
    }

    virtual void release(_Out_writes_opt_(count) _CharType* ptr, _In_ size_t count)
    {
        if (ptr != nullptr) update_current_position(m_current_position + count);
    }

    virtual pplx::task<size_t> _getn(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        return pplx::task_from_result(this->read(ptr, count));
    }

    size_t _sgetn(_Out_writes_(count) _CharType* ptr, _In_ size_t count) { return this->read(ptr, count); }

    virtual size_t _scopy(_Out_writes_(count) _CharType* ptr, _In_ size_t count)
    {
        return this->read(ptr, count, false);
    }

    virtual pplx::task<int_type> _bumpc() { return pplx::task_from_result(this->read_byte(true)); }

    virtual int_type _sbumpc() { return this->read_byte(true); }

    virtual pplx::task<int_type> _getc() { return pplx::task_from_result(this->read_byte(false)); }

    int_type _sgetc() { return this->read_byte(false); }

    virtual pplx::task<int_type> _nextc()
    {
        this->read_byte(true);
        return pplx::task_from_result(this->read_byte(false));
    }

    virtual pplx::task<int_type> _ungetc()
    {
        auto pos = seekoff(-1, std::ios_base::cur, std::ios_base::in);
        if (pos == (pos_type)traits::eof()) return pplx::task_from_result(traits::eof());
        return this->getc();
    }

    virtual pos_type getpos(std::ios_base::openmode mode) const
    {
        if (((mode & std::ios_base::in) && !this->can_read()) || ((mode & std::ios_base::out) && !this->can_write()))
            return static_cast<pos_type>(traits::eof());

        return static_cast<pos_type>(m_current_position);
    }

    virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode)
    {
        pos_type beg(0);

        // In order to support relative seeking from the end position we need to fix an end position.
        // Technically, there is no end for the stream buffer as new writes would just expand the buffer.
        // For now, we assume that the current write_end is the end of the buffer. We use this artificial
        // end to restrict the read head from seeking beyond what is available.

        pos_type end(m_size);

        if (position >= beg)
        {
            auto pos = static_cast<size_t>(position);

            // Read head
            if ((mode & std::ios_base::in) && this->can_read())
            {
                if (position <= end)
                {
                    // We do not allow reads to seek beyond the end or before the start position.
                    update_current_position(pos);
                    return static_cast<pos_type>(m_current_position);
                }
            }

            // Write head
            if ((mode & std::ios_base::out) && this->can_write())
            {
                // Allocate space
                resize_for_write(pos);

                // Nothing to really copy

                // Update write head and satisfy read requests if any
                update_current_position(pos);

                return static_cast<pos_type>(m_current_position);
            }
        }

        return static_cast<pos_type>(traits::eof());
    }

    virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
    {
        pos_type beg = 0;
        pos_type cur = static_cast<pos_type>(m_current_position);
        pos_type end = static_cast<pos_type>(m_size);

        switch (way)
        {
            case std::ios_base::beg: return seekpos(beg + offset, mode);

            case std::ios_base::cur: return seekpos(cur + offset, mode);

            case std::ios_base::end: return seekpos(end + offset, mode);

            default: return static_cast<pos_type>(traits::eof());
        }
    }

private:
    friend class managed_external_raw_buffer;

    basic_managed_external_raw_buffer(uint8_t* data, size_t size, std::ios_base::openmode mode)
        : Concurrency::streams::details::streambuf_state_manager<uint8_t>(mode)
        , m_data(data)
        , m_size(size)
        , m_current_position(0)
    {
        validate_mode(mode);
    }

    static void validate_mode(std::ios_base::openmode mode)
    {
        // Disallow simultaneous use of the stream buffer for writing and reading.
        if ((mode & std::ios_base::in) && (mode & std::ios_base::out))
            throw std::invalid_argument("this combination of modes on container stream not supported");
    }

    bool can_satisfy(size_t)
    {
        // We can always satisfy a read, at least partially, unless the
        // read position is at the very end of the buffer.
        return (in_avail() > 0);
    }

    int_type read_byte(bool advance = true)
    {
        _CharType value;
        auto read_size = this->read(&value, 1, advance);
        return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
    }

    size_t read(_Out_writes_(count) _CharType* ptr, _In_ size_t count, bool advance = true)
    {
        if (!can_satisfy(count)) return 0;

        msl::safeint3::SafeInt<size_t> request_size(count);
        msl::safeint3::SafeInt<size_t> read_size = request_size.Min(in_avail());

        size_t newPos = m_current_position + read_size;

        auto readBegin = m_data + m_current_position;
        auto readEnd = m_data + newPos;

#if defined(_ITERATOR_DEBUG_LEVEL) && _ITERATOR_DEBUG_LEVEL != 0
        // Avoid warning C4996: Use checked iterators under SECURE_SCL
        std::copy(readBegin, readEnd, stdext::checked_array_iterator<_CharType*>(ptr, count));
#else
        std::copy(readBegin, readEnd, ptr);
#endif // _WIN32

        if (advance)
        {
            update_current_position(newPos);
        }

        return (size_t)read_size;
    }

    size_t write(const _CharType* ptr, size_t count)
    {
        if (!this->can_write() || (count == 0)) return 0;

        auto newSize = m_current_position + count;

        // Allocate space
        resize_for_write(newSize);

        // Copy the data
        std::copy(ptr, ptr + count, m_data + m_current_position);

        // Update write head and satisfy pending reads if any
        update_current_position(newSize);

        return count;
    }

    void resize_for_write(size_t newPos)
    {
        // Resize the container if required
        if (newPos > m_size)
        {
            throw std::runtime_error("not supported");
            //m_data.resize(newPos);
        }
    }

    void update_current_position(size_t newPos)
    {
        // The new write head
        m_current_position = newPos;
        _ASSERTE(m_current_position <= m_size);
    }

    // The actual data store
    uint8_t* m_data;
    size_t m_size;

    // Read/write head
    size_t m_current_position;
};

class managed_external_raw_buffer : public Concurrency::streams::streambuf<uint8_t>
{
public:
    typedef uint8_t char_type;

    managed_external_raw_buffer(uint8_t* data, size_t size, std::ios_base::openmode mode = std::ios_base::in)
        : Concurrency::streams::streambuf<uint8_t>(
            std::shared_ptr<basic_managed_external_raw_buffer>(
                new basic_managed_external_raw_buffer(data, size, mode)))
    {}

    uint8_t* collection() const
    {
        auto listBuf = static_cast<basic_managed_external_raw_buffer*>(this->get_base().get());
        return listBuf->collection();
    }
};

use it like this

uint8_t* data = (uint8_t*)malloc(filesize);  // Again, do make sure it's big enough
blob.download_to_stream(managed_external_raw_buffer(data, filesize, std::ios_base::out).create_ostream(), azure::storage::access_condition(), options, azure::storage::operation_context());

Since you mentioned you are calling C++ code from C, please make sure you handle exceptions properly.

Tell me if you have any further questions.

@yxiang92128
Copy link
Author

@JinmingHu-MSFT We are experimenting with your code snippet and will keep you updated. Thanks so much. Have a great weekend.

@yxiang92128
Copy link
Author

@JinmingHu-MSFT
Our initial test results are good with the managed_external_raw_buffer. Thanks so much for your help!
A follow up question:
using a container buffer before, I was able in the end check t the buffer.collection().size() for the actual bytes returned from the block_blob.download_range_to_stream call because that call only returns a void.
using the managed_external_raw_buffer, since the buffer was preallocated by the caller (i.e m_size if fixed), we won't be able to tell many UTF8 bytes it actually read off the blob. Any clue how we could still obtain that information?

Thanks again,

Yang

@Jinming-Hu
Copy link
Member

@yxiang92128 This looks not easy if taking multithreaded download into account. But I instinctively feel it's doable. I need to study further on cpprestsdk source code, will get back to you later.

@yxiang92128
Copy link
Author

yxiang92128 commented Sep 23, 2019

@JinmingHu-MSFT I don't think setting parallelism_factor to anything other than 1 would work for the raw buffer you've implemented for the download_range_to_stream call as you mentioned before. If I set the parallelism_factor to be 2 for instance, and download a chunk from the middle of a blob, the content came back will not match the original segment.

@Jinming-Hu
Copy link
Member

Yeah, if we only consider single-thread download, then I think m_current_position is just the number of bytes actually downloaded. There is happen to be a function getpos which returns this value.

managed_external_raw_buffer buffer(data, filesize, std::ios_base::out);
blob.download_to_stream(buffer.create_ostream(), azure::storage::access_condition(), options, azure::storage::operation_context());
std::cout << buffer.getpos(std::ios_base::out) << std::endl;

Also, We can use download_range_to_stream as a workaround if we need multi-thread download.

@yxiang92128
Copy link
Author

@JinmingHu-MSFT We are testing the raw buffer thoroughly in our application. Here is another issue I've found.

Is there something special about the blobs in “archive” status as far as download is concerned? Does it require to fetch the first 233 bytes or the buffer needs to be at least that big? I understand it should throw an exception because blob in archive state does NOT support download, but it should not crash if the download buffer isn’t at least 233 byte long (anything less than 233 would cause a crash that is).
See attached code which generates a crash.
test_azure.cpp.txt
compile_test_azure.txt

Currently as a workaround, I put in a check for blob_tier_status and if it is “archive” I would not proceed with the download operation, but I would like to understand why the crash would occur and its potential impact to other types of blobs.

If I allocated the in_buffer to be 233 bytes long: I
int bufsiz = 233;
char optr = new char[bufsiz];
managed_external_raw_buffer in_buffer((uint8_t
) optr, bufsiz, std::ios_base::out);

And download size = 128 the correct exception was thrown and no crash/coredump:

starting number for this thread is: 0
download_blob_operation container=nos-blob-tiers obj=data/archive/archive.json.gz offset=0 size=128
Finding container: nos-blob-tiers
tracked sslv23 2 here
modified on=132153067390000000 Fri, 11 Oct 2019 22:32:19 GMT
download size =128 offset=0
Storage exception The object data/archive/archive.json.gz can not be downloaded failed with http code=<409> partial buffer size=0 request length=128 download of data/archive/archive.json.gz failed

However, if I allocated in_bufer to be 232 byte long (one less than previous) and the download size is the same 128 bytes, it will crash:
starting number for this thread is: 0
download_blob_operation container=nos-blob-tiers obj=data/archive/archive.json.gz offset=0 size=128
Finding container: nos-blob-tiers
tracked sslv23 2 here
modified on=132153067390000000 Fri, 11 Oct 2019 22:32:19 GMT
download size =128 offset=0
terminate called after throwing an instance of 'std::runtime_error'
what(): not supported
Aborted (core dumped)
hread 36 "test_azure" received signal SIGABRT, Aborted.
[Switching to Thread 0x7fffe0977700 (LWP 21539)]
0x00007ffff28928d7 in raise () from /lib64/libc.so.6
Missing separate debuginfos, use: zypper install libgcc_s1-debuginfo-6.2.1+r239768-2.4.x86_64 libglib-2_0-0-debuginfo-2.48.2-10.2.x86_64 libglibmm-2_4-1-debuginfo-2.48.1-5.5.x86_64 libgmodule-2_0-0-debuginfo-2.48.2-10.2.x86_64 libgobject-2_0-0-debuginfo-2.48.2-10.2.x86_64 liblzma5-debuginfo-5.0.5-4.852.x86_64 libopenssl1_0_0-debuginfo-1.0.2j-60.11.2.x86_64 libpcre1-debuginfo-8.39-7.1.x86_64 libsigc-2_0-0-debuginfo-2.8.0-6.2.x86_64 libstdc++6-debuginfo-6.2.1+r239768-2.4.x86_64 libuuid1-debuginfo-2.29.2-2.3.x86_64 libxml2-2-debuginfo-2.9.4-46.3.2.x86_64 libz1-debuginfo-1.2.8-11.1.x86_64
(gdb) bt
#0 0x00007ffff28928d7 in raise () at /lib64/libc.so.6
#1 0x00007ffff2893caa in abort () at /lib64/libc.so.6
#2 0x00007ffff31ad72d in __gnu_cxx::__verbose_terminate_handler() () at /usr/lib64/libstdc++.so.6
#3 0x00007ffff31ab706 in () at /usr/lib64/libstdc++.so.6
#4 0x00007ffff31aa739 in () at /usr/lib64/libstdc++.so.6
#5 0x00007ffff31ab08d in __gxx_personality_v0 () at /usr/lib64/libstdc++.so.6
#6 0x00007ffff2c11493 in () at /lib64/libgcc_s.so.1
#7 0x00007ffff2c11997 in _Unwind_Resume () at /lib64/libgcc_s.so.1
#8 0x00007ffff769858c in boost::asio::detail::task_io_service::run(boost::system::error_code&) (this=0x7e3c80, ec=...)
at /usr/local/include/boost/asio/detail/impl/task_io_service.ipp:152
#9 0x00007ffff7698da7 in boost::asio::io_service::run() (this=0x7ffff7bbe808 <(anonymous namespace)::initialize_shared_threadpool(unsigned long)::uninit_threadpool+8>) at /usr/local/include/boost/asio/impl/io_service.ipp:59
#10 0x00007ffff7785def in (anonymous namespace)::threadpool_impl::thread_start(void*) (arg=0x7ffff7bbe800 <(anonymous namespace)::initialize_shared_threadpool(unsigned long)::uninit_threadpool>)
at /home/yang/cpprestsdk-2.10.11/Release/src/pplx/threadpool.cpp:79
#11 0x00007ffff7785d23 in (anonymous namespace)::threadpool_impl::__lambda124::operator()() const (__closure=0x7e6308)
at /home/yang/cpprestsdk-2.10.11/Release/src/pplx/threadpool.cpp:64
#12 0x00007ffff7786814 in boost::asio::detail::posix_thread::func<(anonymous namespace)::threadpool_impl::add_thread()::__lambda124>::run(void) (this=0x7e6300) at /usr/local/include/boost/asio/detail/posix_thread.hpp:82
#13 0x00007ffff769dca8 in boost::asio::detail::boost_asio_detail_posix_thread_function(void*) (arg=0x7e6300)
at /usr/local/include/boost/asio/detail/impl/posix_thread.ipp:64
#14 0x00007ffff7bc7744 in start_thread () at /lib64/libpthread.so.0
#15 0x00007ffff2947aad in clone () at /lib64/libc.so.6
(gdb)

@Jinming-Hu Jinming-Hu self-assigned this Oct 17, 2019
@Jinming-Hu
Copy link
Member

Jinming-Hu commented Oct 17, 2019

@yxiang92128 Hi, this is expected behavior. If you use some HTTP network traffic capture tool, you can see the HTTP response of downloading archive blob is like this:

HTTP/1.1 409 This operation is not permitted on an archived blob.
Content-Length: 233
Content-Type: application/xml
Server: Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0
x-ms-request-id: 00000000-0000-0000-0000-000000000000
x-ms-client-request-id: 00000000-0000-0000-0000-000000000000
x-ms-version: 2019-02-02
x-ms-error-code: BlobArchived
Date: Thu, 17 Oct 2019 04:09:44 GMT

<?xml version="1.0" encoding="utf-8"?><Error><Code>BlobArchived</Code><Message>This operation is not permitted on an archived blob.
RequestId:00000000-0000-0000-0000-000000000000
Time:2019-10-17T04:09:44.0919750Z</Message></Error>

The response body is 233-byte long. Internally, the cpp sdk puts HTTP response body to the buffer, no matter it's blob data or error message.

So in your case, you should provide a buffer that is big enough to hold blob data or error message, whichever is bigger. I know this is not easy, since the sizes of various error messages are hard to predict.

The workaround I can think of is to slightly modify basic_managed_external_raw_buffer::write as follow.

    size_t write(const _CharType* ptr, size_t count)
    {
        if (!this->can_write() || (count == 0)) return 0;

        size_t write_size = std::min(count, m_size - m_current_position);

        // Copy the data
        std::copy(ptr, ptr + write_size, m_data + m_current_position);

        // Update write head and satisfy pending reads if any
        update_current_position(m_current_position + write_size);

        return count;
    }

In this way, if you only care about the HTTP response status code and do not care about the error message at all, you can just provide a 0-sized buffer. If the HTTP body is larger than the buffer, it's truncated. Although there might be an exception thrown when the sdk tries to parse the HTTP response body for detailed error reason, it should be able to be caught. But I'm not 100% sure about this, use it after thorough test and at your own risk.

@yxiang92128
Copy link
Author

yxiang92128 commented Oct 17, 2019

@JinmingHu-MSFT

  1. With your patch above and I experimented with only an 8-byte raw buffer passed in, the same test executed without a crash and somehow the error message did NOT get truncated either, the azure::storage::storage_exception &e returned with both HTTP response code and the error string as a whole beyond the 8-byte boundary:

_download_blob_operation container=nos-blob-tiers obj=data/archive/archive.json.gz offset=0 size=8 Finding container: nos-blob-tiers tracked sslv23 2 here modified on=132153067390000000 Fri, 11 Oct 2019 22:32:19 GMT bufsize=8 download size=8 offset=0 Storage exception The object data/archive/archive.json.gz can not be downloaded <This operation is not permitted on an archived blob.> failed with http code=<409> partial buffer size=8 request length=0 download of data/archive/archive.json.gz failed_

I wonder why it actually worked!

  1. if basic_managed_external_raw_buffer was not being used, internally how big a buffer does the SDK create in order to receive and contain the entire HTTP response body under an error condition?

  2. If there is no error, the raw buffer supplied via basic_managed_external_raw_buffer would only contain the blob data, correct? I don't think HTTP response header would be put in the buffer in this case. The key thing is if we supplied a 128 byte buffer and there is 128 byte blob data returned, the buffer content should only contain that.

Thanks as always!

Yang

@Jinming-Hu
Copy link
Member

@yxiang92128

  1. I just looked through the source code, it's kind of complicated. But I think in the case of error, cpprestsdk uses another std::string to store the error message. And what is in the the user-provided stream(in your case, the buffer) is just a copy of the HTTP response body.

  2. Not very clear, and I believe it's somewhat platform-dependent. This involves construction, destruction and data-copy of stream, string. I guess maybe 0.5~2KB if the error message is about 200-byte long.

  3. Exactly right.

@Jinming-Hu
Copy link
Member

We're going to close this issue because of inactivity, feel free to reopen it if you have any further questions.

@Jinming-Hu
Copy link
Member

Jinming-Hu commented Jun 9, 2020

@yxiang92128 Please use rawptr_stream instead.

ref
https://github.com/microsoft/cpprestsdk/blob/master/Release/include/cpprest/rawptrstream.h#L555
https://microsoft.github.io/cpprestsdk/class_concurrency_1_1streams_1_1rawptr__stream.html

Note there's a small behavior difference, when size of rawptr_stream is not big enough to store the HTTP response body, an exception will be thrown. This becomes a problem when you're trying to download a small blob, but failed. Because error message is usually hundreds of bytes.

@yxiang92128
Copy link
Author

@Jinming-Hu looks like we will have to stick with the original SDK for a while without switching to Track2.
I have a question for you in relation to the above topic we had conversed extensively:
Does "block_blob.upload_from_stream" call create internal buffer while uploading? When we run massive parallel upload jobs we see the memory usage crept up and we ended up crashing. Is there any optimization you would recommend when we pass a buffer to write? Do I increase or decrease the stream_write_size_in_bytes in order to reduce its memory footprint?

Many thanks for any potential hint.

We are now in the phase where write operations are being fully implemented including upload_chunk as well.

Yang

@Jinming-Hu
Copy link
Member

@yxiang92128 what is your use scenario? for example the blob size, parallelism of upload jobs, the kind of stream you pass to upload_from_stream

@yxiang92128
Copy link
Author

yxiang92128 commented Sep 23, 2020

16M blobs, up to 256 threads simultaneous upload and the stream contains are in parquet format.
I think the SDK might have temp buffer created on the fly to support the upload until completion?

@Jinming-Hu
Copy link
Member

Jinming-Hu commented Sep 23, 2020

@yxiang92128 can you give me an example of how do you call upload_from_stream with source code? The original data is in a raw buffer, or std::vector?

I think the SDK might have temp buffer created on the fly to support the upload until completion?

Yes, but we can avoid some copies

@yxiang92128
Copy link
Author

yxiang92128 commented Sep 23, 2020

It's in a raw buffer so I converted it as follows, let me know if I can do better to avoid copies:

   //Push a file from memory buffer to the cloud block blob
auto ss = Concurrency::streams::stringstream::open_istream(strm);
boost::shared_ptr<Concurrency::streams::istream> m_istream =  boost::make_shared<Concurrency::streams::istream>(ss.streambuf());

block_blob.upload_from_stream(m_istream, olen, azure::storage::access_condition(), reqOptions, context);

Thanks!

Yang

@Jinming-Hu
Copy link
Member

@yxiang92128

so the original data is in buf, you make the first copy with

std::string strm((char*)buf, (size_t)len);

and I believe there's another copy in the stream

auto ss = Concurrency::streams::stringstream::open_istream(strm);

i'm not sure if there're more copies.

@Jinming-Hu
Copy link
Member

Jinming-Hu commented Sep 23, 2020

@yxiang92128 can you try this?

auto stream = concurrency::streams::rawptr_stream<uint8_t>::open_istream(buf, len);
block_blob.upload_from_stream(stream);

You have to ensure buf is not deallocated until upload_from_stream returns.

@yxiang92128
Copy link
Author

AZ_blob.cpp:392:19: error: ‘rawptr_stream’ is not a member of ‘Concurrency::streams’
auto stream = concurrency::streams::rawptr_stream<uint8_t>::open_istream(buf, olen);
^
I guess 2.10.14 version of cpprestsdk does not support rawptr_stream yet?

Any other work around?

@Jinming-Hu
Copy link
Member

@yxiang92128 did you #include <cpprest/rawptrstream.h>?

@yxiang92128
Copy link
Author

that worked. I will make a patch and release to test.
But I think this approach still makes an extra copy?

Thanks!!

Yang

@Jinming-Hu
Copy link
Member

@yxiang92128 why? where's that copy?

@Jinming-Hu
Copy link
Member

@yxiang92128 we can check by uploading a huge blob, like 8GB blob. If the process takes 16GB memory, then there's an extra copy.

@yxiang92128
Copy link
Author

would parallelism_factor matter in this case? Do I need to set it to "1" only?

@Jinming-Hu
Copy link
Member

would parallelism_factor matter in this case? Do I need to set it to "1" only?

I don't think so, because rawptrstream is seekable. It would be a different story for non-seekable stream.

@yxiang92128
Copy link
Author

Got it! Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants