diff --git a/libraries/ESP8266WiFi/src/include/DataSource.h b/libraries/ESP8266WiFi/src/include/DataSource.h index 7f399a0584..2a0bfed260 100644 --- a/libraries/ESP8266WiFi/src/include/DataSource.h +++ b/libraries/ESP8266WiFi/src/include/DataSource.h @@ -31,14 +31,14 @@ class BufferDataSource : public DataSource { const uint8_t* get_buffer(size_t size) override { - (void) size; + (void)size; assert(_pos + size <= _size); return _data + _pos; } void release_buffer(const uint8_t* buffer, size_t size) override { - (void) buffer; + (void)buffer; assert(buffer == _data + _pos); _pos += size; } @@ -66,28 +66,65 @@ class BufferedStreamDataSource : public DataSource { const uint8_t* get_buffer(size_t size) override { assert(_pos + size <= _size); - if (_bufferSize < size) { - _buffer.reset(new uint8_t[size]); - _bufferSize = size; + + //Data that was already read from the stream but not released (e.g. if tcp_write error occured). Otherwise this should be 0. + const size_t stream_read = _streamPos - _pos; + + //Min required buffer size: max(requested size, previous stream data already in buffer) + const size_t min_buffer_size = size > stream_read ? size : stream_read; + + //Buffer too small? + if (_bufferSize < min_buffer_size) { + uint8_t *new_buffer = new uint8_t[min_buffer_size]; + //If stream reading is ahead, than some data is already in the old buffer and needs to be copied to new resized buffer + if (_buffer && stream_read > 0) { + memcpy(new_buffer, _buffer.get(), stream_read); + } + _buffer.reset(new_buffer); + _bufferSize = min_buffer_size; + } + + //Fetch remaining data from stream + //If error in tcp_write in ClientContext::_write_some() occured earlier and therefore release_buffer was not called last time, than the requested stream data is already in the buffer. + if (size > stream_read) { + //Remaining bytes to read from stream + const size_t stream_rem = size - stream_read; + const size_t cb = _stream.readBytes(reinterpret_cast(_buffer.get() + stream_read), stream_rem); + assert(cb == stream_rem); + (void)cb; + _streamPos += stream_rem; } - size_t cb = _stream.readBytes(reinterpret_cast(_buffer.get()), size); - assert(cb == size); - (void) cb; return _buffer.get(); + } void release_buffer(const uint8_t* buffer, size_t size) override { - (void) buffer; - _pos += size; + if (size == 0) { + return; + } + + (void)buffer; + _pos += size; + + //Cannot release more than acquired through get_buffer + assert(_pos <= _streamPos); + + //Release less than requested with get_buffer? + if (_pos < _streamPos) { + // Move unreleased stream data in buffer to front + assert(_buffer); + memmove(_buffer.get(), _buffer.get() + size, _streamPos - _pos); + } } protected: - TStream& _stream; + TStream & _stream; std::unique_ptr _buffer; size_t _size; size_t _pos = 0; size_t _bufferSize = 0; + size_t _streamPos = 0; }; class ProgmemStream @@ -104,7 +141,7 @@ class ProgmemStream size_t will_read = (_left < size) ? _left : size; memcpy_P((void*)dst, (PGM_VOID_P)_buf, will_read); _left -= will_read; - _buf += will_read; + _buf += will_read; return will_read; }