OpenVDB  9.0.1
StreamCompression.h
Go to the documentation of this file.
1 // Copyright Contributors to the OpenVDB Project
2 // SPDX-License-Identifier: MPL-2.0
3 
4 /// @file points/StreamCompression.h
5 ///
6 /// @author Dan Bailey
7 ///
8 /// @brief Convenience wrappers to using Blosc and reading and writing of Paged data.
9 ///
10 /// Blosc is most effective with large (> ~256KB) blocks of data. Writing the entire
11 /// data block contiguously would provide the most optimal compression, however would
12 /// limit the ability to use delayed-loading as the whole block would be required to
13 /// be loaded from disk at once. To balance these two competing factors, Paging is used
14 /// to write out blocks of data that are a reasonable size for Blosc. These Pages are
15 /// loaded lazily, tracking the input stream pointers and creating Handles that reference
16 /// portions of the buffer. When the Page buffer is accessed, the data will be read from
17 /// the stream.
18 
19 #ifndef OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
20 #define OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
21 
22 #include <openvdb/io/io.h>
23 #include <tbb/spin_mutex.h>
24 #include <memory>
25 #include <string>
26 
27 
28 class TestStreamCompression;
29 
30 namespace openvdb {
32 namespace OPENVDB_VERSION_NAME {
33 namespace compression {
34 
35 
36 // This is the minimum number of bytes below which Blosc compression is not used to
37 // avoid unecessary computation, as Blosc offers minimal compression until this limit
38 static const int BLOSC_MINIMUM_BYTES = 48;
39 
40 // This is the minimum number of bytes below which the array is padded with zeros up
41 // to this number of bytes to allow Blosc to perform compression with small arrays
42 static const int BLOSC_PAD_BYTES = 128;
43 
44 
45 /// @brief Returns true if compression is available
47 
48 /// @brief Retrieves the uncompressed size of buffer when uncompressed
49 ///
50 /// @param buffer the compressed buffer
51 OPENVDB_API size_t bloscUncompressedSize(const char* buffer);
52 
53 /// @brief Compress into the supplied buffer.
54 ///
55 /// @param compressedBuffer the buffer to compress
56 /// @param compressedBytes number of compressed bytes
57 /// @param bufferBytes the number of bytes in compressedBuffer available to be filled
58 /// @param uncompressedBuffer the uncompressed buffer to compress
59 /// @param uncompressedBytes number of uncompressed bytes
60 OPENVDB_API void bloscCompress(char* compressedBuffer, size_t& compressedBytes,
61  const size_t bufferBytes, const char* uncompressedBuffer, const size_t uncompressedBytes);
62 
63 /// @brief Compress and return the heap-allocated compressed buffer.
64 ///
65 /// @param buffer the buffer to compress
66 /// @param uncompressedBytes number of uncompressed bytes
67 /// @param compressedBytes number of compressed bytes (written to this variable)
68 /// @param resize the compressed buffer will be exactly resized to remove the
69 /// portion used for Blosc overhead, for efficiency this can be
70 /// skipped if it is known that the resulting buffer is temporary
71 OPENVDB_API std::unique_ptr<char[]> bloscCompress(const char* buffer,
72  const size_t uncompressedBytes, size_t& compressedBytes, const bool resize = true);
73 
74 /// @brief Convenience wrapper to retrieve the compressed size of buffer when compressed
75 ///
76 /// @param buffer the uncompressed buffer
77 /// @param uncompressedBytes number of uncompressed bytes
78 OPENVDB_API size_t bloscCompressedSize(const char* buffer, const size_t uncompressedBytes);
79 
80 /// @brief Decompress into the supplied buffer. Will throw if decompression fails or
81 /// uncompressed buffer has insufficient space in which to decompress.
82 ///
83 /// @param uncompressedBuffer the uncompressed buffer to decompress into
84 /// @param expectedBytes the number of bytes expected once the buffer is decompressed
85 /// @param bufferBytes the number of bytes in uncompressedBuffer available to be filled
86 /// @param compressedBuffer the compressed buffer to decompress
87 OPENVDB_API void bloscDecompress(char* uncompressedBuffer, const size_t expectedBytes,
88  const size_t bufferBytes, const char* compressedBuffer);
89 
90 /// @brief Decompress and return the the heap-allocated uncompressed buffer.
91 ///
92 /// @param buffer the buffer to decompress
93 /// @param expectedBytes the number of bytes expected once the buffer is decompressed
94 /// @param resize the compressed buffer will be exactly resized to remove the
95 /// portion used for Blosc overhead, for efficiency this can be
96 /// skipped if it is known that the resulting buffer is temporary
97 OPENVDB_API std::unique_ptr<char[]> bloscDecompress(const char* buffer,
98  const size_t expectedBytes, const bool resize = true);
99 
100 
101 ////////////////////////////////////////
102 
103 
104 // 1MB = 1048576 Bytes
105 static const int PageSize = 1024 * 1024;
106 
107 
108 /// @brief Stores a variable-size, compressed, delayed-load Page of data
109 /// that is loaded into memory when accessed. Access to the Page is
110 /// thread-safe as loading and decompressing the data is protected by a mutex.
112 {
113 private:
114  struct Info
115  {
116  io::MappedFile::Ptr mappedFile;
118  std::streamoff filepos;
119  long compressedBytes;
120  long uncompressedBytes;
121  }; // Info
122 
123 public:
124  using Ptr = std::shared_ptr<Page>;
125 
126  Page() = default;
127 
128  /// @brief load the Page into memory
129  void load() const;
130 
131  /// @brief Uncompressed bytes of the Paged data, available
132  /// when the header has been read.
133  long uncompressedBytes() const;
134 
135  /// @brief Retrieves a data pointer at the specific @param index
136  /// @note Will force a Page load when called.
137  const char* buffer(const int index) const;
138 
139  /// @brief Read the Page header
140  void readHeader(std::istream&);
141 
142  /// @brief Read the Page buffers. If @a delayed is true, stream
143  /// pointers will be stored to load the data lazily.
144  void readBuffers(std::istream&, bool delayed);
145 
146  /// @brief Test if the data is out-of-core
147  bool isOutOfCore() const;
148 
149 private:
150  /// @brief Convenience method to store a copy of the supplied buffer
151  void copy(const std::unique_ptr<char[]>& temp, int pageSize);
152 
153  /// @brief Decompress and store the supplied data
154  void decompress(const std::unique_ptr<char[]>& temp);
155 
156  /// @brief Thread-safe loading of the data
157  void doLoad() const;
158 
159  std::unique_ptr<Info> mInfo = std::unique_ptr<Info>(new Info);
160  std::unique_ptr<char[]> mData;
161  tbb::spin_mutex mMutex;
162 }; // class Page
163 
164 
165 /// @brief A PageHandle holds a unique ptr to a Page and a specific stream
166 /// pointer to a point within the decompressed Page buffer
168 {
169 public:
170  using Ptr = std::unique_ptr<PageHandle>;
171 
172  /// @brief Create the page handle
173  /// @param page a shared ptr to the page that stores the buffer
174  /// @param index start position of the buffer to be read
175  /// @param size total size of the buffer to be read in bytes
176  PageHandle(const Page::Ptr& page, const int index, const int size);
177 
178  /// @brief Retrieve a reference to the stored page
179  Page& page();
180 
181  /// @brief Return the size of the buffer
182  int size() const { return mSize; }
183 
184  /// @brief Read and return the buffer, loading and decompressing
185  /// the Page if necessary.
186  std::unique_ptr<char[]> read();
187 
188  /// @brief Return a copy of this PageHandle
189  Ptr copy() { return Ptr(new PageHandle(mPage, mIndex, mSize)); }
190 
191 protected:
192  friend class ::TestStreamCompression;
193 
194 private:
195  Page::Ptr mPage;
196  int mIndex = -1;
197  int mSize = 0;
198 }; // class PageHandle
199 
200 
201 /// @brief A Paging wrapper to std::istream that is responsible for reading
202 /// from a given input stream and creating Page objects and PageHandles that
203 /// reference those pages for delayed reading.
205 {
206 public:
207  using Ptr = std::shared_ptr<PagedInputStream>;
208 
209  PagedInputStream() = default;
210 
211  explicit PagedInputStream(std::istream& is);
212 
213  /// @brief Size-only mode tags the stream as only reading size data.
214  void setSizeOnly(bool sizeOnly) { mSizeOnly = sizeOnly; }
215  bool sizeOnly() const { return mSizeOnly; }
216 
217  // @brief Set and get the input stream
218  std::istream& getInputStream() { assert(mIs); return *mIs; }
219  void setInputStream(std::istream& is) { mIs = &is; }
220 
221  /// @brief Creates a PageHandle to access the next @param n bytes of the Page.
222  PageHandle::Ptr createHandle(std::streamsize n);
223 
224  /// @brief Takes a @a pageHandle and updates the referenced page with the
225  /// current stream pointer position and if @a delayed is false performs
226  /// an immediate read of the data.
227  void read(PageHandle::Ptr& pageHandle, std::streamsize n, bool delayed = true);
228 
229 private:
230  int mByteIndex = 0;
231  int mUncompressedBytes = 0;
232  std::istream* mIs = nullptr;
233  Page::Ptr mPage;
234  bool mSizeOnly = false;
235 }; // class PagedInputStream
236 
237 
238 /// @brief A Paging wrapper to std::ostream that is responsible for writing
239 /// from a given output stream at intervals set by the PageSize. As Pages are
240 /// variable in size, they are flushed to disk as soon as sufficiently large.
242 {
243 public:
244  using Ptr = std::shared_ptr<PagedOutputStream>;
245 
247 
248  explicit PagedOutputStream(std::ostream& os);
249 
250  /// @brief Size-only mode tags the stream as only writing size data.
251  void setSizeOnly(bool sizeOnly) { mSizeOnly = sizeOnly; }
252  bool sizeOnly() const { return mSizeOnly; }
253 
254  /// @brief Set and get the output stream
255  std::ostream& getOutputStream() { assert(mOs); return *mOs; }
256  void setOutputStream(std::ostream& os) { mOs = &os; }
257 
258  /// @brief Writes the given @param str buffer of size @param n
259  PagedOutputStream& write(const char* str, std::streamsize n);
260 
261  /// @brief Manually flushes the current page to disk if non-zero
262  void flush();
263 
264 private:
265  /// @brief Compress the @param buffer of @param size bytes and write
266  /// out to the stream.
267  void compressAndWrite(const char* buffer, size_t size);
268 
269  /// @brief Resize the internal page buffer to @param size bytes
270  void resize(size_t size);
271 
272  std::unique_ptr<char[]> mData = std::unique_ptr<char[]>(new char[PageSize]);
273  std::unique_ptr<char[]> mCompressedData = nullptr;
274  size_t mCapacity = PageSize;
275  int mBytes = 0;
276  std::ostream* mOs = nullptr;
277  bool mSizeOnly = false;
278 }; // class PagedOutputStream
279 
280 
281 } // namespace compression
282 } // namespace OPENVDB_VERSION_NAME
283 } // namespace openvdb
284 
285 #endif // OPENVDB_TOOLS_STREAM_COMPRESSION_HAS_BEEN_INCLUDED
void setInputStream(std::istream &is)
Definition: StreamCompression.h:219
#define OPENVDB_API
Definition: Platform.h:254
void setSizeOnly(bool sizeOnly)
Size-only mode tags the stream as only reading size data.
Definition: StreamCompression.h:214
OPENVDB_API size_t bloscCompressedSize(const char *buffer, const size_t uncompressedBytes)
Convenience wrapper to retrieve the compressed size of buffer when compressed.
A Paging wrapper to std::istream that is responsible for reading from a given input stream and creati...
Definition: StreamCompression.h:204
SharedPtr< MappedFile > Ptr
Definition: io.h:136
std::ostream & getOutputStream()
Set and get the output stream.
Definition: StreamCompression.h:255
std::shared_ptr< T > SharedPtr
Definition: Types.h:114
bool sizeOnly() const
Definition: StreamCompression.h:252
static fileSize_t write(std::ostream &os, const GridHandle< BufferT > &handle, Codec codec)
std::shared_ptr< PagedOutputStream > Ptr
Definition: StreamCompression.h:244
void setOutputStream(std::ostream &os)
Definition: StreamCompression.h:256
static void read(std::istream &is, GridHandle< BufferT > &handle, Codec codec)
A Paging wrapper to std::ostream that is responsible for writing from a given output stream at interv...
Definition: StreamCompression.h:241
OPENVDB_API std::unique_ptr< char[]> bloscDecompress(const char *buffer, const size_t expectedBytes, const bool resize=true)
Decompress and return the the heap-allocated uncompressed buffer.
static const int PageSize
Definition: StreamCompression.h:105
Stores a variable-size, compressed, delayed-load Page of data that is loaded into memory when accesse...
Definition: StreamCompression.h:111
A PageHandle holds a unique ptr to a Page and a specific stream pointer to a point within the decompr...
Definition: StreamCompression.h:167
std::shared_ptr< Page > Ptr
Definition: StreamCompression.h:124
OPENVDB_API bool bloscCanCompress()
Returns true if compression is available.
Ptr copy()
Return a copy of this PageHandle.
Definition: StreamCompression.h:189
std::istream & getInputStream()
Definition: StreamCompression.h:218
Definition: Exceptions.h:13
std::shared_ptr< PagedInputStream > Ptr
Definition: StreamCompression.h:207
OPENVDB_API size_t bloscUncompressedSize(const char *buffer)
Retrieves the uncompressed size of buffer when uncompressed.
OPENVDB_API std::unique_ptr< char[]> bloscCompress(const char *buffer, const size_t uncompressedBytes, size_t &compressedBytes, const bool resize=true)
Compress and return the heap-allocated compressed buffer.
bool sizeOnly() const
Definition: StreamCompression.h:215
void setSizeOnly(bool sizeOnly)
Size-only mode tags the stream as only writing size data.
Definition: StreamCompression.h:251
static const int BLOSC_MINIMUM_BYTES
Definition: StreamCompression.h:38
std::unique_ptr< PageHandle > Ptr
Definition: StreamCompression.h:170
int size() const
Return the size of the buffer.
Definition: StreamCompression.h:182
#define OPENVDB_VERSION_NAME
The version namespace name for this library version.
Definition: version.h.in:116
static const int BLOSC_PAD_BYTES
Definition: StreamCompression.h:42
#define OPENVDB_USE_VERSION_NAMESPACE
Definition: version.h.in:202