Streaming via a C API presents a conundrum in Haskell: on the one hand, the C code is side effecting, on the other, we would like to provide a lazy API.
If you are not familiar with the problem of side effects and laziness, have a look at the
paper by
Wadler
(in particular the array updates example)
and perhaps the
source
for the IO
monad.
Consider the problem of streaming compression via a C library.
We wish to ensure that effects of calling these C functions take place in the correct order (like IO
),
and we wish to ensure that specific effects take place before values are read; unlike
IO
we do not care if some effects are not performed if their results are not
needed. A function of type
decompress :: BSL.ByteString -> IO BSL.ByteString
would force all
side effects (i.e. allocations) to occur even if we only need part of the
result. Morally, we would like
the work to occur based on (lazy) evaluation rather than control
flow.
Concretely, consider decompress
from my
lz4-hs
package:
decompress :: BSL.ByteString
-> BSL.ByteString
decompress bs = runST $ do
let bss = BSL.toChunks bs
(ctx, buf) <- LazyST.unsafeIOToST $ do
(err, preCtx) <- lZ4FCreateDecompressionContext lZ4FGetVersion
ctx <- castForeignPtr <$> newForeignPtr lZ4FFreeCompressionContext (castPtr preCtx)
check err
dstBuf <- mallocForeignPtrBytes bufSz
pure (ctx, dstBuf)
BSL.fromChunks <$> loop ctx buf bss
where bufSz :: Integral a => a
bufSz = 32 * 1024
loop :: LzDecompressionCtxPtr -> ForeignPtr a -> [BS.ByteString] -> LazyST.ST s [BS.ByteString]
loop _ _ [] = pure []
loop ctx buf (b:bs') = do
(nxt, res) <- stepChunk ctx buf b
case nxt of
Nothing -> (res:) <$> loop ctx buf bs'
Just next -> (res:) <$> loop ctx buf (next:bs')
stepChunk :: LzDecompressionCtxPtr -> ForeignPtr a -> BS.ByteString -> LazyST.ST s (Maybe BS.ByteString, BS.ByteString)
stepChunk !ctx !dst b = LazyST.unsafeIOToST $
BS.unsafeUseAsCStringLen b $ (buf, sz) ->
withForeignPtr dst $ \d ->
alloca $ \dSzPtr ->
alloca $ \szPtr -> do
poke dSzPtr (fromIntegral bufSz)
poke szPtr (fromIntegral sz)
res <- lZ4FDecompress ctx d dSzPtr buf szPtr nullPtr
check res
bRead <- peek szPtr
bWritten <- peek dSzPtr
outBs <- BS.packCStringLen (castPtr d, fromIntegral bWritten)
let remBs = if fromIntegral bRead == sz
then Nothing
else Just (BS.drop (fromIntegral bRead) b)
pure (remBs, outBs)
This relies on the following c2hs code:
{# fun pure LZ4F_getVersion as ^ {} -> `CUInt' #}
type LZ4FErrorCode = CSize
{# typedef LZ4F_errorCode_t LZ4FErrorCode #}
data LzDecompressionCtx
{# pointer *LZ4F_dctx as LzDecompressionCtxPtr foreign finalizer LZ4F_freeDecompressionContext as ^ -> LzDecompressionCtx #}
{# fun LZ4F_createDecompressionContext as ^ { alloca- `Ptr LzDecompressionCtx' peek*, `CUInt' } -> `LZ4FErrorCode' #}
data LzDecompressOptions
{# pointer *LZ4F_decompressOptions_t as LzDecompressOptionsPtr -> LzDecompressOptions #}
{# fun LZ4F_decompress as ^
{ `LzDecompressionCtxPtr'
, castPtr `Ptr a'
, castPtr `Ptr CSize'
, castPtr `Ptr b'
, castPtr `Ptr CSize'
, `LzDecompressOptionsPtr'
} -> `CSize' coerce
#}
corresponding to the following C header:
typedef size_t LZ4F_errorCode_t;
LZ4FLIB_API unsigned LZ4F_getVersion(void);
typedef struct LZ4F_dctx_s LZ4F_dctx;
typedef LZ4F_dctx* LZ4F_decompressionContext_t;
typedef struct {
unsigned stableDst;
unsigned reserved[3];
} LZ4F_decompressOptions_t;
LZ4FLIB_API LZ4F_errorCode_t LZ4F_createDecompressionContext(LZ4F_dctx** dctxPtr, unsigned version);
LZ4FLIB_API LZ4F_errorCode_t LZ4F_freeDecompressionContext(LZ4F_dctx* dctx);
LZ4FLIB_API size_t LZ4F_decompress(LZ4F_dctx* dctx,
void* dstBuffer, size_t* dstSizePtr,
const void* srcBuffer, size_t* srcSizePtr,
const LZ4F_decompressOptions_t* dOptPtr);
Such APIs are common in C: a stateful object LZ4F_decompressionContext_t
is
behind a pointer; we perform a series of steps that always has the same result, but we
need to track side effects since the data pointed to by LzDecompressionCtxPtr
is mutating
throughout the computation
(recall the example of array updates in the Wadler
paper).
Each stepChunk
needs to bunched together - we must perform lZ4FDecompress
before
BS.packCStringLen
but at the same time IO
[BS.ByteString]
is not precisely what we want: it would mean lZ4FDecompress
had to be called on each chunk of the input to read the first chunk of the
output, failing to live up to the promise of laziness.
We resolve this by calling unsafeIOToST
on the result of stepChunk
, that is,
each step is lifted into the lazy ST
monad; stepChunk
only does work
when a new chunk of the BSL.ByteString
is needed.
In fact, if decompress
is not lazy, we get pathological memory overuse.
Consider the following program:
module Main (main) where
import Codec.Lz4
import qualified Data.ByteString.Lazy as BSL
import System.FilePath ((>))
import System.IO.Temp (withSystemTempDirectory)
main :: IO ()
main = sequence_
[ compressDump
, decompressDump
]
decompressDump :: IO ()
decompressDump = withSystemTempDirectory "lz4" $
\fp -> BSL.writeFile (fp > "valgrind-3.15.0.tar") =<<
(decompress <$> BSL.readFile "valgrind-3.15.0.tar.lz4")
compressDump :: IO ()
compressDump = withSystemTempDirectory "lz4" $
\fp -> BSL.writeFile (fp > "valgrind-3.15.0.tar.lz4") =<<
(compress <$> BSL.readFile "valgrind-3.15.0.tar")
With the lazy ST
monad, we get the following heap profile:
Had we used the strict ST
monad:
This shows that laziness is necessary to have sensible memory use. Contrary to superstition, laziness is not synonymous with worse performance; some space leaks are strictness-induced.
Moreover, I think this provides a superior API. C libraries handle streaming compression in various ways (a stateful decoder, callbacks); in Haskell, we use a familiar lazy linked list. The streaming and non-streaming cases are essentially the same in Haskell (have a look at lz4-hs), which is not the case in C (see lz4frame.h and lz4.h).
Technique
I haven't seen this technique put forward or evaluated anywhere before; in fact, I only figured the above out by reading Herbert Valerio Riedel's source code for lzma.
So, for reference, if you want to use this technique:
Use
ForeignPtr
overPtr
. Edward Z Yang has a series on c2hs, and you can look at the c2hs wiki as well.I have not had any success getting lazy streaming to work with ordinary
Ptr
s and explicitfree
s.Each step of the loop should return a
LazyST s a
rather than anIO a
; one gets anST s a
from anIO a
viaunsafeIOToST
.There is no need to use
unsafeIOToST
more than once per iteration.
Also of note, I have not had any success using unsafeInterleaveIO
between
steps as
zstd
does.