vigoo's software development blog

prox part 3 - effect abstraction and ZIO

Posted on August 13, 2019

Blog post series

Intro

The first post introduced the prox library and demonstrated the advanced type level programming techniques it uses. Then in the second part of this series we experimented with replacing the streaming library from fs2 to Akka Streams.

In both cases the library used cats-effect for describing side effects. But it did not really take advantage of cats-effect's effect abstraction: it explicitly defined everything to be a computation in IO, cats-effect's implementation of describing effectful computations.

But we can do better! By not relying on IO but the various type classes the cats-effect library provides we can make prox work with any kind of effect library out of the box. One such example is ZIO.

Effect abstraction

Let's see an example of how IO used to be used in the library! The following function is in the Start type class, and it starts a process or piped process group:

def apply(process: PN, dontStartOutput: Boolean = false, blocker: Blocker)
         (implicit contextShift: ContextShift[IO]): IO[RunningProcesses]

We can observe two things here:

To make it independent of the effect library implementation we have to get rid of IO and use a generic type instead, let's call it F:

def apply(process: PN, 
          dontStartOutput: Boolean = false, 
          blocker: Blocker)
         (implicit
          concurrent: Concurrent[F],
          contextShift: ContextShift[F]): F[RunningProcesses]

Beside using F instead of IO everywhere we also have a new requirement, our context type (F) have to have an implementation of the Concurrent type class.

Cats-effect defines a hierarchy of type classes to deal with effectful computations. At the time of writing it looks like this:

Read the official documentation for more information.

Prox is based on the ProcessNode type which has two implementations, a single Process or a set of processes piped together to a PipedProcess. Because these types store their I/O redirection within themselves, they also have to be enriched with a context type parameter.

For example Process will look like this:

class Process[F[_], Out, Err, OutResult, ErrResult, IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState]
(val command: String,
 val arguments: List[String],
 val workingDirectory: Option[Path],
 val inputSource: ProcessInputSource[F],
 val outputTarget: ProcessOutputTarget[F, Out, OutResult],
 val errorTarget: ProcessErrorTarget[F, Err, ErrResult],
 val environmentVariables: Map[String, String],
 val removedEnvironmentVariables: Set[String])
  extends ProcessNode[Out, Err, IRS, ORS, ERS] {
    // ...
}

The context parameter (F) is needed because the input source and output target are all representing effectful code such as writing to the standard output, reading from a file, or passing data through concurrent streams.

Let's see some examples of how the abstract types of cats-effect can be used to describe the computation, when we cannot rely on IO itself!

The most basic operation is to delay the execution of some code that does not use the effect abstractions. This is how we wrap the Java process API, for example.

While with the original implementation of prox it was done by using the IO constructor:

IO {
    systemProcess.isAlive
}

with an arbitrary F we only need to require that it has an implementation of the Sync type class:

private class WrappedProcess[F[_] : Sync,  // ...

and then use the delay function:

Sync[F].delay {
    systemProcess.isAlive
}

Similarily the Concurrent type class can be used to start a concurrent computation on a fiber:

Concurrent[F].start(stream.compile.toVector)

Type level

This would be it - except that we need one more thing because of the type level techniques described in the first post.

To understand the problem, let's see how the output redirection operator works. It is implemented as an extension method on the ProcessNode type:

implicit class ProcessNodeOutputRedirect[PN <: ProcessNode[_, _, _, NotRedirected, _]](processNode: PN) {
    def >[F[_], To, NewOut, NewOutResult, Result <: ProcessNode[_, _, _, Redirected, _]]
    (to: To)
    (implicit
     target: CanBeProcessOutputTarget.Aux[F, To, NewOut, NewOutResult],
     redirectOutput: RedirectOutput.Aux[F, PN, To, NewOut, NewOutResult, Result]): Result = {
      redirectOutput(processNode, to)
    }
}

This extension method basically just finds the appropriate type class implementations and then call it to alter the process node to register the output redirection:

This code would compile, but we won't be able to use it. For example for the following code:

running <- (Process[IO]("echo", List("Hello world!")) > tempFile.toPath).start(blocker)

It fails with not being able to resolve the implicits correctly. The exact error of course depends much on the context but one example for the above line could be:

[error] prox/src/test/scala/io/github/vigoo/prox/ProcessSpecs.scala:95:63: diverging implicit expansion for type cats.effect.Concurrent[F]
[error] starting with method catsIorTConcurrent in object Concurrent
[error]         running <- (Process[IO]("echo", List("Hello world!")) > tempFile.toPath).start(blocker)

This does not really help understanding the real problem though. As we have seen earlier, in this library the Process types have to be parameterized with the context as well, because they store their redirection logic within themselves. That's why we specify it explicitly in the example to be IO: Process[IO](...). What we would expect is that by tying F[_] to IO at the beginning, all the subsequent operations such as the > redirection would respect this and the context gets inferred to be IO everywhere in the expression.

The compiler cannot do this. If we check the definition of > again, you can see that there is no connection expressed between the type PN (the actual process node type) and F which is used as a type parameter for the implicit parameters.

The fix is to link the two, and we have a technique exactly for this that I described earlier: the aux pattern.

First let's write some code that, in compile time, can "extract" the context type from a process node type:

trait ContextOf[PN] {
  type Context[_]
}

object ContextOf {
  type Aux[PN, F[_]] = ContextOf[PN] {
    type Context[_] = F[_]
  }

  def apply[PN <: ProcessNode[_, _, _, _, _], F[_]](implicit contextOf: ContextOf.Aux[PN, F]): Aux[PN, F] = contextOf

  implicit def contextOfProcess[F[_], Out, Err, OutResult, ErrResult, IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState]:
  Aux[Process[F, Out, Err, OutResult, ErrResult, IRS, ORS, ERS], F] =
    new ContextOf[Process[F, Out, Err, OutResult, ErrResult, IRS, ORS, ERS]] {
      override type Context[_] = F[_]
    }

  implicit def contextOfPipedProcess[
  F[_],
  Out, Err,
  PN1 <: ProcessNode[_, _, _, _, _],
  PN2 <: ProcessNode[_, _, _, _, _],
  IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState]:
  Aux[PipedProcess[F, Out, Err, Byte, PN1, PN2, IRS, ORS, ERS], F] =
    new ContextOf[PipedProcess[F, Out, Err, Byte, PN1, PN2, IRS, ORS, ERS]] {
      override type Context[_] = F[_]
    }
}

Both Process and PipedProcess have the context as their first type parameter. By creating the ContextOf type class and the corresponding Aux type we can extend the > operator to require such a connection (a way to get a F[_] context out of a type PN) in compile time, and with the aux pattern it unifies the type parameters and the context type gets chained through all the subsequent calls as we desired:

def >[F[_], To, NewOut, NewOutResult, Result <: ProcessNode[_, _, _, Redirected, _]]
    (to: To)
    (implicit
     contextOf: ContextOf.Aux[PN, F],
     target: CanBeProcessOutputTarget.Aux[F, To, NewOut, NewOutResult],
     redirectOutput: RedirectOutput.Aux[F, PN, To, NewOut, NewOutResult, Result]): Result = {
      redirectOutput(processNode, to)
    }

ZIO

Now that everything is in place, we can try out whether prox is really working with other effect libraries such as ZIO.

ZIO has a compatibility layer for cats-effect. It's the implementation of the type classes cats-effect provides. It is in an extra library called zio-interop-cats.

For running processes with prox we can use the following variants of the ZIO type:

This in fact assuming the correct context only means switching IO to RIO or Task in the type parameter for Process:

import zio.interop.catz._

Blocker[RIO[Console, ?]].use { blocker =>
  for {
      // ...
      _ <- console.putStrLn("Starting external process...")
      _ <- (Process[Task]("echo", List("Hello world!")) > tempFile.toPath).start(blocker)
      // ...
  } yield ()
}

A nice way to have everything set up for this is to use the interop library's CatsApp trait as an entrypoint for the application.

This brings all the necessary implicits in scope and requires you to implement the following function as the entry point of the application:

def run(args: List[String]): ZIO[Environment, Nothing, Int]