Intro to Haskell’s conduit library (Conduit 101)

Haskell’s conduit library provides some deep capabilities for building transformative workflows, while providing many useful guarantees (such as constant memory usage – something that often bites Haskell developers). In this post, I’ll walk you through some simple examples of using conduit to transform text, but also demonstrate some useful variations on input and output.

Note: Some of this was post was taken indirectly or directly from several posts by conduit author Michael Snoyman and another by Joe Hillenbrand. This post is not meant to replace their (much better) writing. Instead, this post is more of a guide to iteratively progressing over the use of different Sources, Sinks, and Conduit types and is by no means extensive over all the possible applications. For these more in-depth posts, see:

Note 2: All of this code is available on github @ https://github.com/stormont/conduit-examples

…or directly with: git clone https://github.com/stormont/conduit-examples.git

Prerequisites

  • Have GHC version >= 7.8 installed (examples tested against 7.8.3)
  • Have Cabal version >= 1.20 installed (examples tested against 1.20.0.2)
  • These examples have been tested on Windows 8.1 and Amazon Linux AMI
  • telnet or an equivalent tool must be available on the system (for running the later examples)
  • It is strongly recommended to build within a sandbox environment: cabal sandbox init
  • Run cabal configure first to see which libraries you need to install with cabal install library-name

Before we begin, some header stuff. These will change for the later examples; I’ll call this out again at that time.

{-# LANGUAGE RankNTypes #-}
module Main
  where

import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Control.Monad.State
import Control.Monad.Trans.Resource
import Data.ByteString.Char8 (ByteString, empty, pack, unpack)
import Data.Conduit
import System.IO (stdin)

Building a Conduit

For our first example, we’ll build a trivial example of hooking up a Source to a Sink. Every conduit needs a Source and a Sink to operate. We’ll simply build a Source that takes a String input argument, breaks up the input into words, and feeds each word to a Sink, which will return the list of words as output.

wordsSource :: Monad m => String -> Producer m [String]
wordsSource = yield . words  -- yield simply pushes its argument down the conduit chain to the next Conduit

identitySink :: Monad m => ConduitM [a] o m [a]
identitySink = CL.foldMap id  -- Simply returns the input 'as is' as output

main = do
  let input = "The quick brown fox jumped over the lazy dog"
  xs <- wordsSource input $$ identitySink  -- Connects our Source to our Sink and captures the output
  mapM_ putStrLn xs

In the github project, this is splitWordsExample under examples/conduit-101-01.

Here, our wordsSource Producer (which is a synonymous type for Source) generates a [String]. Our identitySink ConduitM consumes a [a], produces a stream of o, and generates a final result of [a]. For this example, the o is discarded and we simply capture the output.

The conduit stack will terminate when no more data is available in the stream. This termination condition is different than a simple “wait for input”. If a downstream conduit is waiting for input, it will wait indefinitely. But, when an upstream conduit is terminated, the downstream conduits will stop processing. Don’t worry, we’ll show some examples of this later.

If the different names confuse you, remember this:

  • Sources produce data (and expect no upstream input)
  • Sinks consume data (and send no data downstream)
  • Producer is a type synonym for a Source
  • Consumer is a type synonym for a Sink
  • Conduits usually refer more directly to the intermediate pieces for everything stacked in between (but in reality, everything is a type of Conduit)

Adding an Intermediate Transformation

Now, we’ll take this example and do something slightly more complex: Adding an additional Conduit in the middle of the stack to transform our words. In this case, we’ll just count the number of characters in each word.

numLettersConduit :: Monad m => Conduit [String] m [Int]
numLettersConduit = CL.map (map length)  -- Transforms each String input by counting the number of characters

main = do
  let input = "The quick brown fox jumped over the lazy dog"
  xs <- wordsSource input
     $= numLettersConduit  -- Plug it in to the middle of the conduit stack
     $$ identitySink
  mapM_ (putStrLn . show) xs

In the github project, this is wordLengthExample_1 under examples/conduit-101-01.

Here, our numLettersConduit Conduit consumes a stream of [String] and outputs a stream of [Int].

That was easy! A quick word on the $= and $$ operators. These operators are rather flexible and can be combined in different ways to have different meanings (there are also a few more extension operators similar to these). But, in this example, $= can be read as “Connect a Source ($) to a Conduit (=)”. Meanwhile, $$ can be read as “Connect a Source ($) to a Sink ($)”. (When you connect multiple Conduits, the upstream Conduits effectively become Sources for the downstream Sinks.)

More Transformations

You can plug in a bunch more Conduits along the way to perform additional transformations, as long as you respect the type arguments. Here, we’ll simply build upon the last example by converting our word length count Ints back into Strings.

showConduit :: Monad m => Conduit [Int] m [String]
showConduit = CL.map (map show)  -- S

main = do
  let input = "The quick brown fox jumped over the lazy dog"
  xs <- wordsSource input
     $= numLettersConduit
     $= showConduit  -- Multiple conduits can be piped together
     $$ identitySink
  mapM_ putStrLn xs

In the github project, this is wordLengthExample_2 under examples/conduit-101-01.

Here, our showConduit Conduit consumes a stream of [Int] and outputs a stream of [String].

As you can see, we simply piped multiple Conduits in to the middle of the stack. This works as long as you adhere to the input and output types of the various Conduits.

Fusing Conduits

Lastly, we can take the last example and “fuse” multiple Conduits together into a single reference:

fusedConduit :: Monad m => Conduit [String] m [String]
fusedConduit = fuse numLettersConduit showConduit  -- Fuses the two references together

main = do
  let input = "The quick brown fox jumped over the lazy dog"
  xs <- wordsSource input
     $= fusedConduit  -- Can now be used as a single reference
     $$ identitySink
  mapM_ putStrLn xs

In the github project, this is wordLengthExample_3 under examples/conduit-101-01.

Here, we simply define a new Conduit, fusedConduit, whose Conduit consumes a stream of [String] and outputs a stream of [String] by fusing our numLettersConduit and showConduit conduits together. This enables the single fusedConduit reference to be used.

Variations on using stdin/stdout

Now, let’s get in to some variations on using standard stream handles with our conduits.

Getting Input from stdin

A simple tweak on our above examples to get user input using stdin:

main = do
  input <- getLine
  xs <- wordsSource input
     $$ identitySink
  mapM_ putStrLn xs

In the github project, this is userInputExample_1 under examples/conduit-101-01.

There’s nothing special or conduit-y about this. But, it sets us up for the next examples.

Moving stdin and stdout Within our Conduits

userInputSource :: Producer IO String
userInputSource = do
  let
    loop acc = do
      c <- liftIO getChar            -- Get a character of user input
      case c of
        ' ' -> do                    -- On a space, we'll push what we have so far
          yield $ reverse acc        -- We yield the collection downstream
          loop []                    -- We reset the collection and loop
        '\n' -> yield $ reverse acc  -- On a newline, we'll push what we have and stop looping
        _ -> loop (c : acc)          -- We collect the character and loop
  loop []                            -- We simply start the loop

identityStdOutSink :: Consumer String IO ()
identityStdOutSink = awaitForever $ liftIO . putStrLn  -- For each String input, we'll output it to the console

main = do
  _ <- userInputSource
    $$ identityStdOutSink
  return ()

In the github project, this is userInputExample_2 under examples/conduit-101-01.

Consumer is type synonymous with a Sink; here, receiving a String stream input and producing no stream output and a () result.

What we’re doing here is looping on input in our Source and only pushing data downstream when we hit a space or newline character. Our Sink is simply taking each element from the input stream and writing it out as a new line to stdout. awaitForever will wait for each “chunk” of input from the stream, until the stream terminates.

Sending Multiple Streams of Input Downstream

Of course, we don’t necessarily have to terminate right away. Here, we play with the input a bit, pushing as much downstream as we like, until an empty line is input:

userInputConduitSource :: Source IO [String]
userInputConduitSource = do
     CB.sourceHandle stdin   -- Using a handle to stdin
  $= CB.lines                -- ...stream lines of input
  $= loop                    -- ...to our internal loop conduit
  where
    loop = do
      liftIO $ putStr "> "
      r <- yieldStrings      -- Yield each line downstream
      case r of
        False -> return ()   -- If we've hit an empty line of input, stop
        True -> loop         -- Otherwise, don't terminate the loop conduit

yieldStrings :: ConduitM ByteString [String] IO Bool
yieldStrings = do
  mbs <- await                -- Wait for stream input
  case mbs of
    Nothing -> return False   -- If we don't get any input, signal termination
    Just bs -> do
      let s = takeWhile (\x -> x /= '\r' && x /= '\n')
            $ unpack bs       -- Unpack the ByteString and trim off trailing newline characters
      case s of
        ""   -> return False  -- If we get empty input, signal termination
        ss   -> do            -- Otherwise, break the input into words and yield downstream
          yield $ words ss
          return True         -- ...and signal non-termination

main = do
  _ <- userInputConduitSource
    $$ identitiesStdOutSink
  return ()

In the github project, this is userInputExample_3 under examples/conduit-101-01.

This one is a little bit more complex, in order to demonstrate some of the conduit library functions available for consuming input from stdin, as well as for performing custom transformations within a conduit. The CB.-prefixed functions come from Data.Conduit.Binary; there’s a lot of useful stuff in there. I recommend you read the documentation (it’s pretty well written, too).

The await function will hang until the stream provides some input, at which point you can perform custom handling. But, unlike awaitForever, this won’t loop automatically! If you don’t provide some looping mechanism (as we’ve done by re-calling yieldStrings from loop), this stream will terminate here.

The other tidbit we’ve shown here is that we can embed conduits within other conduits! This is very handy when you need to perform some form of buffering within a transformation conduit, as we’ve done here.

Using File Streams

Of course, conduit comes with some excellent accessors to file streams, built-in. Let’s see how these can be used.

Sourcing Input by Reading a File

Here, we’ll read input from a file within a Source, then send the data downstream to be output to stdout:

fileInputSource :: (Monad m, MonadResource m) => FilePath -> Source m [String]
fileInputSource file = do
     CB.sourceFile file              -- Reads input from the file
  $= loop                            -- ...and feeds it to our loop
  where
    loop = do
      mbs <- await                   -- Await the next chunk of file input
      case mbs of
        Nothing -> return ()         -- On no input, terminate
        Just bs -> do
          yield $ words $ unpack bs  -- Otherwise, split the input into words and push it downstream
          loop                       -- ...and continue looping

main = do
  let file = "examples/conduit-101-01/words.txt"
  _ <- runResourceT                  -- Protect our file handle from exceptions
    $  fileInputSource file          -- ...and fire up our conduit
    $$ identitiesResStdOutSink
  return ()

In the github project, this is filesExample_1 under examples/conduit-101-01. If you examine the input file, you’ll see the same “lazy dog” text example we’ve already been working with.

We haven’t done much different here from the stdin example above. We’ve simply used the CB library function sourceFile to read input from a file, rather than stdin.

The other new feature is the use of runResourceT. This nicely wraps the subsequent computation within a MonadResource, to safely trap exceptions that may break the computation. This is always a good plan, as file and network I/O can often be fickle.

Sinking Output to a File

Now that we’ve demonstrated how to read input from a file, let’s show how you can stream output to a file:

conduitToByteString :: Monad m => Conduit [String] m ByteString
conduitToByteString = awaitForever $ yield . pack . unlines  -- Takes the input [String] and yields it as a single ByteString downstream

main = do
  let input = "examples/conduit-101-01/words.txt"
  let output = "examples/conduit-101-01/output.txt"
  _ <- runResourceT
    $  fileInputSource input
    $= conduitToByteString
    $$ CB.sinkFile output     -- Writes the input stream to a file
  return ()

In the github project, this is filesExample_2 under examples/conduit-101-01. After running this example, you should see a examples/conduit-101-01/output.txt file that contains the original input, splitting each word onto a new line.

This example is also pretty simple. We’re just taking our Sink stream and writing it to a file, using the CB function sinkFile. The only caveat is that the sinkFile function expects a ByteString, so we first transform the [String] input into a ByteString.

Streaming Multiple Input Files to a Single Output

As a last variation on file I/O, let’s see how we can take input from multiple files and stream it to stdout:

multiFileInputSource :: MonadResource m => [FilePath] -> Source m FilePath
multiFileInputSource files = mapM_ yield files  -- Yields the complete set of files downstream

conduitFile :: MonadResource m => Conduit FilePath m ByteString
conduitFile = awaitForever CB.sourceFile  -- For each FilePath input, yield its file contents downstream

byteStringStdOutSink :: MonadResource m => Consumer ByteString m ()
byteStringStdOutSink = awaitForever $ liftIO . putStrLn . unpack  -- Simply takes each input ByteString and outputs it to stdout

main = do
  let input1 = "examples/conduit-101-01/words.txt"
  let input2 = "examples/conduit-101-01/lengths.txt"
  _ <- runResourceT
    $  multiFileInputSource [input1, input2]
    $= conduitFile
    $$ byteStringStdOutSink
  return ()

In the github project, this is filesExample_3 under examples/conduit-101-01.

This also isn’t very complex. We’ve simply taken the set of files as our initial stream of inputs, then for each one, read its contents and yielded them downstream.

What About Network I/O?

We’ve now covered standard streams and file I/O. But, what about networking? Luckily, there’s a conduit library for that: Data.Conduit.Network (within the conduit-extra package)

Reading Input from a Socket

For the first variation, we’ll read input from a socket and output it to stdout. This involves new header imports and such:

And the code:

splitWords :: ConduitM ByteString String IO ()
splitWords = do
  loop
  where
    loop = do
      r <- yieldStrings
      case r of
        False -> return ()
        True  -> loop

main = do
  withSocketsDo $ do  -- On Windows, we need this to properly enable sockets; on *nix, this is a safe no-op
    runTCPServer (serverSettings 4000 "*") $ \appData -> do  -- Runs a TCP server at port 4000; for each input...
         appSource appData     -- Stream the input from its source socket
      $= splitWords            -- ...do the same line-to-word-split conversion we've seen previously
      $$ identitiesStdOutSink  -- ...and output to stdout

In the github project, this is main under examples/conduit-101-02. Hang on to this one, we’ll be re-using it in the next examples, too.

You’ll need to use telnet or a similar tool in order to talk to the server; you can fire this up with:

telnet 127.0.0.1 4000

There’s only really two new pieces here. The library function runTCPServer starts up a TCP server (in this case, on port 4000) and captures each input to the server as the variable appData. To extract the stream from the captured input ByteString, you call the library method appSource. The library function serverSettings helper allows more fine-grained control over the server configuration – check out the documentation for more details.

splitWords here is nearly the same as the userInputConduitSource we saw previously, but is a bit simpler (relevant processing has already been done by appSource).

A caveat to this example: To mimic the prior examples, it’s designed for the server to stop reading input as soon as a newline is read. This may surprise you in this and the later examples, if you forget about this detail.

Note: We should always use withSocketsDo in cases where we ever might run on Windows. Otherwise, you’ll get not-so-helpful error messages when you try to start the server, such as:

getAddrInfo: does not exist (error 10093)

Writing Data to a Socket

So, that’s reading from a socket. What about writing to a socket?

main = do
  withSocketsDo $ do
    runTCPClient (clientSettings 4000 "localhost") $ \server -> do  -- Fire up a TCP client
         yield "hello world\n"  -- ...yield a string downstream
      $$ appSink server         -- ...which is sunk to the server socket

In the github project, this is main under examples/conduit-101-03. You’ll first want to fire up the example server from examples/conduit-101-02, in order to have a socket to talk to.

Again, pretty simple. We just fire up a TCP client this time with the library function runTCPClient. This time, with our server connection, we’ll yield data downstream, with which we use the library function appSink to sink the data to the socket. The library function clientSettings just helps configure the connection to the server (it does need to match the host:port parameters of the server).

The trailing newline on our yielded stream is just to demonstrate the nuance with our examples/conduit-101-02 server terminating reading input after a newline character is read.

Reading and Writing to Sockets (Let’s Make a Proxy!)

Note: This is gratuitously stolen from Michael Snoyman’s proxy example. For more in-depth fun, see his post and build ANGRY NETWORK CONDUITS.

We’re going to read input from a socket, output it to our local stdout, then stream the data to another socket:

echo :: ConduitM B.ByteString B.ByteString IO ()
echo = do
  awaitForever $ \x -> do
    liftIO $ B.putStr x  -- Just output the stream to stdout
    yield x              -- ...before yielding it downstream

main = do
  withSocketsDo $ do
    runTCPServer (serverSettings 4002 "*") $ \client ->
      runTCPClient (clientSettings 4000 "localhost") $ \server -> do
        (appSource client $= echo $$ appSink server)  -- Receive input from the client, echo it out, and sink it to the server

In the github project, this is main under examples/conduit-101-04. You’ll first want to fire up the example server from examples/conduit-101-02, in order to have a socket to talk to as the “real” socket. You’ll also want to fire up a telnet 127.0.0.1 4002 client to talk to the proxy layer (this piece).

We haven’t really introduced anything new to this example; we’ve simply combined the two previous examples. We fire up a TCP server for the proxy layer on port 4002, as well as connect as a client to our already-running server on port 4000 (you did fire up the previous examples/conduit-101-02 example, right?). Then, once our telnet client connects to our new proxy server on port 4002, we’ll shuttle messages to the real server on port 4000.

Conclusion

That’s it for our little excursion through conduit for today – the weeds aren’t as thick as they might have appeared from afar. As you can see, the library is extremely flexible and provides numerous ways to source, sink, and transform data. It also provides a number of other benefits; in particular, expressiveness (although I’ve made some of these examples unnecessarily verbose, for demonstration purposes) and constant memory usage.

Dig into the linked posts I added at the top to see more examples and to learn more of the nuances of Conduits. Also, do dig into the library documentation! It’s a wealth of knowledge and very modular for great flexibility.