Intro to Haskell’s conduit library (Conduit 101)
Haskell’s conduit
library provides some deep capabilities for building transformative workflows, while providing many useful guarantees (such as constant memory usage – something that often bites Haskell developers). In this post, I’ll walk you through some simple examples of using conduit
to transform text, but also demonstrate some useful variations on input and output.
Note: Some of this was post was taken indirectly or directly from several posts by conduit author Michael Snoyman and another by Joe Hillenbrand. This post is not meant to replace their (much better) writing. Instead, this post is more of a guide to iteratively progressing over the use of different Source
s, Sink
s, and Conduit
types and is by no means extensive over all the possible applications. For these more in-depth posts, see:
- https://www.fpcomplete.com/user/snoyberg/library-documentation/conduit-overview
- https://www.fpcomplete.com/user/joehillen/building-an-async-chat-server-with-conduit
- http://www.yesodweb.com/blog/2014/03/network-conduit-async
Note 2: All of this code is available on github @ https://github.com/stormont/conduit-examples
…or directly with: git clone https://github.com/stormont/conduit-examples.git
Prerequisites
- Have GHC version >= 7.8 installed (examples tested against 7.8.3)
- Have Cabal version >= 1.20 installed (examples tested against 1.20.0.2)
- These examples have been tested on Windows 8.1 and Amazon Linux AMI
telnet
or an equivalent tool must be available on the system (for running the later examples)- It is strongly recommended to build within a sandbox environment:
cabal sandbox init
- Run
cabal configure
first to see which libraries you need to install withcabal install library-name
Before we begin, some header stuff. These will change for the later examples; I’ll call this out again at that time.
{-# LANGUAGE RankNTypes #-} module Main where import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Control.Monad.State import Control.Monad.Trans.Resource import Data.ByteString.Char8 (ByteString, empty, pack, unpack) import Data.Conduit import System.IO (stdin)
Building a Conduit
For our first example, we’ll build a trivial example of hooking up a Source
to a Sink
. Every conduit needs a Source
and a Sink
to operate. We’ll simply build a Source
that takes a String
input argument, breaks up the input into words, and feeds each word to a Sink
, which will return the list of words as output.
wordsSource :: Monad m => String -> Producer m [String] wordsSource = yield . words -- yield simply pushes its argument down the conduit chain to the next Conduit identitySink :: Monad m => ConduitM [a] o m [a] identitySink = CL.foldMap id -- Simply returns the input 'as is' as output main = do let input = "The quick brown fox jumped over the lazy dog" xs <- wordsSource input $$ identitySink -- Connects our Source to our Sink and captures the output mapM_ putStrLn xs
In the github project, this is splitWordsExample
under examples/conduit-101-01
.
Here, our wordsSource Producer
(which is a synonymous type for Source
) generates a [String]
. Our identitySink ConduitM
consumes a [a]
, produces a stream of o
, and generates a final result of [a]
. For this example, the o
is discarded and we simply capture the output.
The conduit stack will terminate when no more data is available in the stream. This termination condition is different than a simple “wait for input”. If a downstream conduit is waiting for input, it will wait indefinitely. But, when an upstream conduit is terminated, the downstream conduits will stop processing. Don’t worry, we’ll show some examples of this later.
If the different names confuse you, remember this:
Sources
produce data (and expect no upstream input)Sinks
consume data (and send no data downstream)Producer
is a type synonym for aSource
Consumer
is a type synonym for aSink
Conduits
usually refer more directly to the intermediate pieces for everything stacked in between (but in reality, everything is a type ofConduit
)
Adding an Intermediate Transformation
Now, we’ll take this example and do something slightly more complex: Adding an additional Conduit
in the middle of the stack to transform our words. In this case, we’ll just count the number of characters in each word.
numLettersConduit :: Monad m => Conduit [String] m [Int] numLettersConduit = CL.map (map length) -- Transforms each String input by counting the number of characters main = do let input = "The quick brown fox jumped over the lazy dog" xs <- wordsSource input $= numLettersConduit -- Plug it in to the middle of the conduit stack $$ identitySink mapM_ (putStrLn . show) xs
In the github project, this is wordLengthExample_1
under examples/conduit-101-01
.
Here, our numLettersConduit Conduit
consumes a stream of [String]
and outputs a stream of [Int]
.
That was easy! A quick word on the $=
and $$
operators. These operators are rather flexible and can be combined in different ways to have different meanings (there are also a few more extension operators similar to these). But, in this example, $=
can be read as “Connect a Source ($) to a Conduit (=)”. Meanwhile, $$
can be read as “Connect a Source ($) to a Sink ($)”. (When you connect multiple Conduits
, the upstream Conduits
effectively become Sources
for the downstream Sinks
.)
More Transformations
You can plug in a bunch more Conduits
along the way to perform additional transformations, as long as you respect the type arguments. Here, we’ll simply build upon the last example by converting our word length count Int
s back into String
s.
showConduit :: Monad m => Conduit [Int] m [String] showConduit = CL.map (map show) -- S main = do let input = "The quick brown fox jumped over the lazy dog" xs <- wordsSource input $= numLettersConduit $= showConduit -- Multiple conduits can be piped together $$ identitySink mapM_ putStrLn xs
In the github project, this is wordLengthExample_2
under examples/conduit-101-01
.
Here, our showConduit Conduit
consumes a stream of [Int]
and outputs a stream of [String]
.
As you can see, we simply piped multiple Conduit
s in to the middle of the stack. This works as long as you adhere to the input and output types of the various Conduit
s.
Fusing Conduits
Lastly, we can take the last example and “fuse” multiple Conduit
s together into a single reference:
fusedConduit :: Monad m => Conduit [String] m [String] fusedConduit = fuse numLettersConduit showConduit -- Fuses the two references together main = do let input = "The quick brown fox jumped over the lazy dog" xs <- wordsSource input $= fusedConduit -- Can now be used as a single reference $$ identitySink mapM_ putStrLn xs
In the github project, this is wordLengthExample_3
under examples/conduit-101-01
.
Here, we simply define a new Conduit
, fusedConduit, whose Conduit
consumes a stream of [String]
and outputs a stream of [String]
by fusing our numLettersConduit and showConduit conduits together. This enables the single fusedConduit reference to be used.
Variations on using stdin/stdout
Now, let’s get in to some variations on using standard stream handles with our conduits.
Getting Input from stdin
A simple tweak on our above examples to get user input using stdin
:
main = do input <- getLine xs <- wordsSource input $$ identitySink mapM_ putStrLn xs
In the github project, this is userInputExample_1
under examples/conduit-101-01
.
There’s nothing special or conduit
-y about this. But, it sets us up for the next examples.
Moving stdin and stdout Within our Conduits
userInputSource :: Producer IO String userInputSource = do let loop acc = do c <- liftIO getChar -- Get a character of user input case c of ' ' -> do -- On a space, we'll push what we have so far yield $ reverse acc -- We yield the collection downstream loop [] -- We reset the collection and loop '\n' -> yield $ reverse acc -- On a newline, we'll push what we have and stop looping _ -> loop (c : acc) -- We collect the character and loop loop [] -- We simply start the loop identityStdOutSink :: Consumer String IO () identityStdOutSink = awaitForever $ liftIO . putStrLn -- For each String input, we'll output it to the console main = do _ <- userInputSource $$ identityStdOutSink return ()
In the github project, this is userInputExample_2
under examples/conduit-101-01
.
Consumer
is type synonymous with a Sink
; here, receiving a String
stream input and producing no stream output and a () result.
What we’re doing here is looping on input in our Source
and only pushing data downstream when we hit a space or newline character. Our Sink
is simply taking each element from the input stream and writing it out as a new line to stdout. awaitForever
will wait for each “chunk” of input from the stream, until the stream terminates.
Sending Multiple Streams of Input Downstream
Of course, we don’t necessarily have to terminate right away. Here, we play with the input a bit, pushing as much downstream as we like, until an empty line is input:
userInputConduitSource :: Source IO [String] userInputConduitSource = do CB.sourceHandle stdin -- Using a handle to stdin $= CB.lines -- ...stream lines of input $= loop -- ...to our internal loop conduit where loop = do liftIO $ putStr "> " r <- yieldStrings -- Yield each line downstream case r of False -> return () -- If we've hit an empty line of input, stop True -> loop -- Otherwise, don't terminate the loop conduit yieldStrings :: ConduitM ByteString [String] IO Bool yieldStrings = do mbs <- await -- Wait for stream input case mbs of Nothing -> return False -- If we don't get any input, signal termination Just bs -> do let s = takeWhile (\x -> x /= '\r' && x /= '\n') $ unpack bs -- Unpack the ByteString and trim off trailing newline characters case s of "" -> return False -- If we get empty input, signal termination ss -> do -- Otherwise, break the input into words and yield downstream yield $ words ss return True -- ...and signal non-termination main = do _ <- userInputConduitSource $$ identitiesStdOutSink return ()
In the github project, this is userInputExample_3
under examples/conduit-101-01
.
This one is a little bit more complex, in order to demonstrate some of the conduit
library functions available for consuming input from stdin
, as well as for performing custom transformations within a conduit. The CB.
-prefixed functions come from Data.Conduit.Binary
; there’s a lot of useful stuff in there. I recommend you read the documentation (it’s pretty well written, too).
The await
function will hang until the stream provides some input, at which point you can perform custom handling. But, unlike awaitForever
, this won’t loop automatically! If you don’t provide some looping mechanism (as we’ve done by re-calling yieldStrings
from loop
), this stream will terminate here.
The other tidbit we’ve shown here is that we can embed conduits within other conduits! This is very handy when you need to perform some form of buffering within a transformation conduit, as we’ve done here.
Using File Streams
Of course, conduit
comes with some excellent accessors to file streams, built-in. Let’s see how these can be used.
Sourcing Input by Reading a File
Here, we’ll read input from a file within a Source
, then send the data downstream to be output to stdout:
fileInputSource :: (Monad m, MonadResource m) => FilePath -> Source m [String] fileInputSource file = do CB.sourceFile file -- Reads input from the file $= loop -- ...and feeds it to our loop where loop = do mbs <- await -- Await the next chunk of file input case mbs of Nothing -> return () -- On no input, terminate Just bs -> do yield $ words $ unpack bs -- Otherwise, split the input into words and push it downstream loop -- ...and continue looping main = do let file = "examples/conduit-101-01/words.txt" _ <- runResourceT -- Protect our file handle from exceptions $ fileInputSource file -- ...and fire up our conduit $$ identitiesResStdOutSink return ()
In the github project, this is filesExample_1
under examples/conduit-101-01
. If you examine the input file, you’ll see the same “lazy dog” text example we’ve already been working with.
We haven’t done much different here from the stdin
example above. We’ve simply used the CB library function sourceFile
to read input from a file, rather than stdin
.
The other new feature is the use of runResourceT
. This nicely wraps the subsequent computation within a MonadResource
, to safely trap exceptions that may break the computation. This is always a good plan, as file and network I/O can often be fickle.
Sinking Output to a File
Now that we’ve demonstrated how to read input from a file, let’s show how you can stream output to a file:
conduitToByteString :: Monad m => Conduit [String] m ByteString conduitToByteString = awaitForever $ yield . pack . unlines -- Takes the input [String] and yields it as a single ByteString downstream main = do let input = "examples/conduit-101-01/words.txt" let output = "examples/conduit-101-01/output.txt" _ <- runResourceT $ fileInputSource input $= conduitToByteString $$ CB.sinkFile output -- Writes the input stream to a file return ()
In the github project, this is filesExample_2
under examples/conduit-101-01
. After running this example, you should see a examples/conduit-101-01/output.txt
file that contains the original input, splitting each word onto a new line.
This example is also pretty simple. We’re just taking our Sink
stream and writing it to a file, using the CB function sinkFile
. The only caveat is that the sinkFile
function expects a ByteString
, so we first transform the [String]
input into a ByteString
.
Streaming Multiple Input Files to a Single Output
As a last variation on file I/O, let’s see how we can take input from multiple files and stream it to stdout
:
multiFileInputSource :: MonadResource m => [FilePath] -> Source m FilePath multiFileInputSource files = mapM_ yield files -- Yields the complete set of files downstream conduitFile :: MonadResource m => Conduit FilePath m ByteString conduitFile = awaitForever CB.sourceFile -- For each FilePath input, yield its file contents downstream byteStringStdOutSink :: MonadResource m => Consumer ByteString m () byteStringStdOutSink = awaitForever $ liftIO . putStrLn . unpack -- Simply takes each input ByteString and outputs it to stdout main = do let input1 = "examples/conduit-101-01/words.txt" let input2 = "examples/conduit-101-01/lengths.txt" _ <- runResourceT $ multiFileInputSource [input1, input2] $= conduitFile $$ byteStringStdOutSink return ()
In the github project, this is filesExample_3
under examples/conduit-101-01
.
This also isn’t very complex. We’ve simply taken the set of files as our initial stream of inputs, then for each one, read its contents and yielded them downstream.
What About Network I/O?
We’ve now covered standard streams and file I/O. But, what about networking? Luckily, there’s a conduit
library for that: Data.Conduit.Network
(within the conduit-extra
package)
Reading Input from a Socket
For the first variation, we’ll read input from a socket and output it to stdout
. This involves new header imports and such:
And the code:
splitWords :: ConduitM ByteString String IO () splitWords = do loop where loop = do r <- yieldStrings case r of False -> return () True -> loop main = do withSocketsDo $ do -- On Windows, we need this to properly enable sockets; on *nix, this is a safe no-op runTCPServer (serverSettings 4000 "*") $ \appData -> do -- Runs a TCP server at port 4000; for each input... appSource appData -- Stream the input from its source socket $= splitWords -- ...do the same line-to-word-split conversion we've seen previously $$ identitiesStdOutSink -- ...and output to stdout
In the github project, this is main
under examples/conduit-101-02
. Hang on to this one, we’ll be re-using it in the next examples, too.
You’ll need to use telnet
or a similar tool in order to talk to the server; you can fire this up with:
telnet 127.0.0.1 4000
There’s only really two new pieces here. The library function runTCPServer
starts up a TCP server (in this case, on port 4000) and captures each input to the server as the variable appData
. To extract the stream from the captured input ByteString, you call the library method appSource
. The library function serverSettings
helper allows more fine-grained control over the server configuration – check out the documentation for more details.
splitWords
here is nearly the same as the userInputConduitSource
we saw previously, but is a bit simpler (relevant processing has already been done by appSource
).
A caveat to this example: To mimic the prior examples, it’s designed for the server to stop reading input as soon as a newline is read. This may surprise you in this and the later examples, if you forget about this detail.
Note: We should always use withSocketsDo
in cases where we ever might run on Windows. Otherwise, you’ll get not-so-helpful error messages when you try to start the server, such as:
getAddrInfo: does not exist (error 10093)
Writing Data to a Socket
So, that’s reading from a socket. What about writing to a socket?
main = do withSocketsDo $ do runTCPClient (clientSettings 4000 "localhost") $ \server -> do -- Fire up a TCP client yield "hello world\n" -- ...yield a string downstream $$ appSink server -- ...which is sunk to the server socket
In the github project, this is main
under examples/conduit-101-03
. You’ll first want to fire up the example server from examples/conduit-101-02
, in order to have a socket to talk to.
Again, pretty simple. We just fire up a TCP client this time with the library function runTCPClient
. This time, with our server connection, we’ll yield data downstream, with which we use the library function appSink
to sink the data to the socket. The library function clientSettings
just helps configure the connection to the server (it does need to match the host:port parameters of the server).
The trailing newline on our yielded stream is just to demonstrate the nuance with our examples/conduit-101-02
server terminating reading input after a newline character is read.
Reading and Writing to Sockets (Let’s Make a Proxy!)
Note: This is gratuitously stolen from Michael Snoyman’s proxy example. For more in-depth fun, see his post and build ANGRY NETWORK CONDUITS.
We’re going to read input from a socket, output it to our local stdout
, then stream the data to another socket:
echo :: ConduitM B.ByteString B.ByteString IO () echo = do awaitForever $ \x -> do liftIO $ B.putStr x -- Just output the stream to stdout yield x -- ...before yielding it downstream main = do withSocketsDo $ do runTCPServer (serverSettings 4002 "*") $ \client -> runTCPClient (clientSettings 4000 "localhost") $ \server -> do (appSource client $= echo $$ appSink server) -- Receive input from the client, echo it out, and sink it to the server
In the github project, this is main
under examples/conduit-101-04
. You’ll first want to fire up the example server from examples/conduit-101-02
, in order to have a socket to talk to as the “real” socket. You’ll also want to fire up a telnet 127.0.0.1 4002
client to talk to the proxy layer (this piece).
We haven’t really introduced anything new to this example; we’ve simply combined the two previous examples. We fire up a TCP server for the proxy layer on port 4002, as well as connect as a client to our already-running server on port 4000 (you did fire up the previous examples/conduit-101-02
example, right?). Then, once our telnet
client connects to our new proxy server on port 4002, we’ll shuttle messages to the real server on port 4000.
Conclusion
That’s it for our little excursion through conduit
for today – the weeds aren’t as thick as they might have appeared from afar. As you can see, the library is extremely flexible and provides numerous ways to source, sink, and transform data. It also provides a number of other benefits; in particular, expressiveness (although I’ve made some of these examples unnecessarily verbose, for demonstration purposes) and constant memory usage.
Dig into the linked posts I added at the top to see more examples and to learn more of the nuances of Conduits
. Also, do dig into the library documentation! It’s a wealth of knowledge and very modular for great flexibility.