vigoo's software development blog

prox part 2 - akka streams with cats effect

Posted on March 07, 2019

Blog post series

Intro

In the previous post we have seen how prox applies advanced type level programming techniques to express executing external system processes. The input and output of these processes can be connected to streams. The current version of prox uses the fs2 library to describe these streams, and cats-effect as an IO abstraction, allowing it to separate the specification of a process pipeline from its actual execution.

In this post we will keep cats-effect but replace fs2 with the stream library of the Akka toolkit, Akka Streams. This will be a hybrid solution, as Akka Streams is not using any kind of IO abstraction, unlike fs2 which is implemented on top of cats-effect. We will experiment with implementing prox purely with the Akka libraries in a future post.

Replacing fs2 with Akka Streams

We start by removing the fs2 dependency and adding Akka Streams:

- "co.fs2" %% "fs2-core" % "1.0.3",
- "co.fs2" %% "fs2-io" % "1.0.3",

+ "com.typesafe.akka" %% "akka-stream" % "2.5.20",

Then we have to change all the fs2 types used in the codebase to the matching Akka Streams types. The following table describe these pairs:

fs2Akka Streams
Stream[IO, O]Source[O, Any]
Pipe[IO, I, O]Flow[I, O, Any]
Sink[IO, O]Sink[O, Future[Done]

Another small difference that requires changing a lot of our functions is the implicit context these streaming solutions require.

With the original implementation it used to be:

We can treat the blocking execution context as part of the implicit context for prox too, and could refactor the library to pass both of them wrapped together within a context object.

Let's see what we need for the Akka Streams based implementation!

So for example the start extension method, is modified like this:

- def start[RP](blockingExecutionContext: ExecutionContext)
               (implicit start: Start.Aux[PN, RP, _], 
                contextShift: ContextShift[IO]): IO[RP]
+ def start[RP]()
               (implicit start: Start.Aux[PN, RP, _],
                contextShift: ContextShift[IO],
                materializer: Materializer,
                executionContext: ExecutionContext): IO[RP]

It turns out that there is one more minor difference that needs changes in the internal type signatures.

In Akka Streams byte streams are represented by not streams of element type Byte. like in fs2, but streams of chunks called ByteStrings. So everywhere we used Byte as element type, such as on the process boundaries, we now simply have to use ByteStrings, for example:

- def apply(from: PN1, to: PN2, via: Pipe[IO, Byte, Byte]): ResultProcess 
+ def apply(from: PN1, to: PN2, via: Flow[ByteString, ByteString, Any]): ResultProcess 

Another thing to notice is that fs2 had a type parameter for passing the IO monad to run on. As I wrote earlier, Akka Streams does not depend on such abstractions, so this parameter is missing. On the other hand, it has a third type parameter which is set in the above example to Any. This parameter is called Mat and represents the type of the value the flow will materialize to. At this point we don't care about it so we set it to Any.

Let's take a look of the connect function of the ProcessIO trait. With fs2 the InputStreamingSource is implemented like this:

class InputStreamingSource(source: Source[ByteString, Any]) extends ProcessInputSource {
    override def toRedirect: Redirect = Redirect.PIPE
    
    override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)
                        (implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = {
        source.observe(
            io.writeOutputStream[IO](
                IO { systemProcess.getOutputStream },
                closeAfterUse = true,
                blockingExecutionContext = blockingExecutionContext))
    }

    override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] =
        Concurrent[IO].start(stream.compile.drain) 
}

We have a source stream and during the setup of the process graph, when the system process has been already created, we have to set up the redirection of this source stream to this process. This is separated to a connect and a run step:

In the case of fs2 we can be sure that the source.observe function is pure just by checking it's type signature:

def observe(p: Pipe[F, O, Unit])(implicit F: Concurrent[F]): Stream[F, O]

All side-effecting functions in fs2 are defined as IO functions, so we simply know that this one is not among them, and that's why the connect was a pure, non-IO function in the original implementation. With Akka Streams we don't have any information about this encoded in the type system. We use the source.alsoTo function:

def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out]

which is actually also pure (only creating a blueprint of the graph to be executed), so we can safely replace the implementation to this in the Akka Streams version:

class InputStreamingSource(source: Source[ByteString, Any]) extends ProcessInputSource {
    override def toRedirect: Redirect = Redirect.PIPE

    override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] =
        source.alsoTo(fromOutputStream(() => systemProcess.getOutputStream, autoFlush = true))

    override def run(stream: Source[ByteString, Any])
                    (implicit contextShift: ContextShift[IO],
                     materializer: Materializer,
                     executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = {
        Concurrent[IO].start(IO.async { finish =>
            stream.runWith(Sink.ignore).onComplete {
                case Success(Done) => finish(Right(()))
                case Failure(reason) => finish(Left(reason))
            }
        })
    }
}

The implementation of run above is a nice example of how we can integrate asynchronous operations not implemented with cats-effect to an IO based program. With IO.async we define how to start the asynchronous operation (in this case running the Akka stream) and we get a callback function, finish to be called when the asynchronous operation ends. The stream here materializes to a Future[T] value, so we can use it's onComplete function to notify the IO system about the finished stream. The IO value returned by IO.async represents the whole asynchronous operation, it returns it's final result when the callback is called, and "blocks" the program flow until it is done. This does not mean actually blocking a thread; but the next IO function will be executed only when it finished running (as it's type is IO[A]). That is not what we need here, so we use Concurrent[IO].start to put this IO action on a separate fiber. This way all streams involved in the process graph will be executing in parallel.

Calculating the result

prox supports multiple ways to calculate a result of running a process graph:

These cases can be enforced by the Drain, ToVector and Fold wrapper classes.

Let's see how we can implement them with Akka Streams compared to fs2.

Drain sink

The sink version was implemented like this with fs2:

Concurrent[IO].start(stream.compile.drain)

With Akka Streams there is one big difference. In fs2 the sink is represented as a Pipe[F, E, Unit], so we could treat it in the same way as other stream segments. In this case the Sink is not a Flow, so we do a trick to keep the interface as close to the original one as possible:

create((sink: Sink[ByteString, Future[R]]) => new OutputStreamingTarget(Flow.fromFunction(identity)) 
                                                with ProcessOutputTarget[ByteString, R] {
    override def run(stream: Source[ByteString, Any])
                    (implicit contextShift: ContextShift[IO],
                    materializer: Materializer,
                    executionContext: ExecutionContext): IO[Fiber[IO, R]] =
    Concurrent[IO].start(IO.async { complete =>
        stream.runWith(sink).onComplete {
            case Success(value) => complete(Right(value))
            case Failure(reason) => complete(Left(reason))
        }
    })
}

The trick is that we create the OutputStreamingTarget with an identity flow, and only use the Sink when we actually run the stream, passing it to the runWith function. This materializes the stream into a Future[Done] value, that we can tie back to our IO system with IO.async as I already described it.

Combine with Monoid

When the element type is a monoid we can fold it into a single value. Fs2 directly supports this:

Concurrent[IO].start(stream.compile.foldMonoid)

Akka Streams does not use cats type classes, but it also has a way to fold the stream, so we can easily implement it using the monoid instance:

Concurrent[IO].start(IO.async { complete =>
    stream.runFold(monoid.empty)(monoid.combine).onComplete {
        case Success(value) => complete(Right(value))
        case Failure(reason) => complete(Left(reason))
    }
})

Vector of elements

Finally let's see the version that keeps all the stream elements in a vector as a result:

Concurrent[IO].start(stream.compile.toVector)

With Akka Streams we can do it by running the stream into a sink created for this, Sink.seq. It materializes into a Future[Seq[T]] value that holds all the elements of the executed stream:

Concurrent[IO].start(IO.async { complete =>
    stream.runWith(Sink.seq).onComplete {
        case Success(value) => complete(Right(value.toVector))
        case Failure(reason) => complete(Left(reason))
    }
})

Testing

At this point the only remaining thing is to modify the tests too. One of the more complex examples is the customProcessPiping test case. With fs2 it takes advantage of some text processing pipe elements coming with the library:

val customPipe: Pipe[IO, Byte, Byte] =
    (s: Stream[IO, Byte]) => s
    .through(text.utf8Decode)
    .through(text.lines)
    .map(_.split(' ').toVector)
    .map(v => v.map(_ + " !!!").mkString(" "))
    .intersperse("\n")
    .through(text.utf8Encode)

val proc = Process("echo", List("This is a test string"))
            .via(customPipe)
            .to(Process("wc", List("-w")) > text.utf8Decode[IO])

There are similar tools in Akka Streams to express this in the Framing module:

 val customPipe = Framing.delimiter(
      delimiter = ByteString("\n"),
      maximumFrameLength = 10000,
      allowTruncation = true
    ).map(_.utf8String)
     .map(_.split(' ').toVector)
     .map(v => v.map(_ + " !!!").mkString(" "))
     .intersperse("\n")
     .map(ByteString.apply)

val proc = Process("echo", List("This is a test string"))
            .via(customPipe)
            .to(Process("wc", List("-w")) > utf8Decode)

where utf8Decode is a helper sink defined as:

val utf8Decode: Sink[ByteString, Future[String]] =
    Flow[ByteString]
        .reduce(_ ++ _)
        .map(_.utf8String)
        .toMat(Sink.head)(Keep.right)

First it concatenates the ByteString chunks, then simply calls .utf8String on the result.

Final thoughts

We have seen that it is relatively easy to replace the stream library in prox without changing it's interface much, if we keep cats-effect for expressing the effectful computations. The complete working example is available on the akka-streams branch.