vigoo's software development blog

prox part 1 - type level programming

Posted on February 10, 2019

Blog post series

Intro

I started writing prox at the end of 2017 for two reasons. First, I never liked any of the existing solutions for running external processes and capture their input/output streams. And I just returned from the scala.io conference full of inspiration; I wanted to try out some techniques and libraries and this seemed to be a nice small project to do so.

Since then, prox has been proved to be useful, we are using it at Prezi in all our Scala projects where we have to deal with external processes. The last stable version was created last October, after cats-effect 1.0 and fs2 1.0 was released.

This is the first part of a series of blog posts dedicated to this library. In the first one I'm going to talk about shapeless and type level programming techniques are used to create a strongly typed interface for starting system processes. In future posts I will explore replacing its dependencies such as using akka-streams instead of fs2 or ZIO instead of cats-effect. These different versions will be a good opportunity to do some performance comparison, and to close the series with creating a new version of the library which is easier to use in the alternative environments.

Limiting redirection

When I started writing the library I wanted to explore how I can express some strict constraints on the type level:

In prox 0.2.1 a single system process is described by the following type:

class Process[Out, Err, OutResult, ErrResult, 
              IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState](
    val command: String,
    val arguments: List[String],
    val workingDirectory: Option[Path],
    val inputSource: ProcessInputSource,
    val outputTarget: ProcessOutputTarget[Out, OutResult],
    val errorTarget: ProcessErrorTarget[Err, ErrResult],
    val environmentVariables: Map[String, String])
    extends ProcessNode[Out, Err, IRS, ORS, ERS] {
        // ...
}

but let's focus first on the requirement to be able to redirect one of the streams maximum once. This is encoded by the IRS, ORS and ERS type parameters, which are all have to be subtypes of RedirectionState. RedirectionState is a phantom type; there are no values ever created of this type, it is only used in type signatures to encode whether one of the three streams are already redirected or not:

/** Phantom type representing the redirection state of a process */
sealed trait RedirectionState

/** Indicates that the given channel is not redirected yet */
trait NotRedirected extends RedirectionState

/** Indicates that the given channel has already been redirected */
trait Redirected extends RedirectionState

So for example with a simplified model of a process, Process[IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState], using the output redirection operator > would change the types in the following way:

val p1: Process[NotRedirected, NotRedirected, NotRedirected] = ???
val p2: Process[NotRedirected, Redirected, NotRedirected] = p1 > (home / "tmp" / "out.txt")
val p3 = p2 > (home / "tmp" / "another.txt") // THIS MUST NOT COMPILE

How can we restrict the redirect function to only work on Process[_, NotRedirected, _]? We can define it as an extension method with an implicit class (once again this is a simplified version focusing only on the redirection state handling):

  implicit class ProcessNodeOutputRedirect[
      IRS <: RedirectionState, 
      ERS <: RedirectionState, 
      PN <: Process[IRS, NotRedirected, ERS]](process: PN) {

    def >[To](to: To)(implicit target: CanBeProcessOutputTarget[To]): Process[IRS, Redirected, ERS] = ???
  }

By forcing the ORS type parameter to be NotRedirected and setting it to Redirected in the result type we can guarantee that this function can only be called on a process that does not have their output redirected yet. The target of the redirection is extensible through the CanBeProcessOutputTarget type class, as we will see later.

Dependent types

Reality is much more complicated, because of process piping and because the process types encode the redirection result types too. Let's get back to our > function and see how we could modify it so it works with piped processes too. Anyway, how is process piping encoded in this library?

Two processes connected through a pipe are represented by the PipedProcess class. Both Procses and PipedProcess implements the following trait:

sealed trait ProcessNode[Out, Err, IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState]

We've already seen Process. PipedProcess is a bit more complicated:

class PipedProcess[Out, Err, PN1Out, 
                   PN1 <: ProcessNode[_, _, _, _, _], 
                   PN2 <: ProcessNode[_, _, _, _, _], 
                   IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState]
    (val from: PN1, val createTo: PipeConstruction[PN1Out] => PN2)
    extends ProcessNode[Out, Err, IRS, ORS, ERS] {
        // ...
}

To make > work on both, we can start by modifying its definition to work on any ProcessNode not just Process (omitting the output type params for now):

implicit class ProcessNodeOutputRedirect[
    IRS <: RedirectionState, 
    ERS <: RedirectionState, 
    PN <: ProcessNode[IRS, NotRedirected, ERS]](process: PN) {

  def >[To](to: To)(implicit target: CanBeProcessOutputTarget[To]): ProcessNode[IRS, Redirected, ERS] = ???
}

This has a serious problem though. The output type is ProcessNode and not the "real" process type, which means that we lose type information and all the other dependent typed operations will not work. We have to make the result type depend on the input!

We may try to use the RedirectionOutput type class like this:

implicit class ProcessNodeOutputRedirect[
    IRS <: RedirectionState, 
    ERS <: RedirectionState, 
    PN <: ProcessNode[IRS, NotRedirected, ERS]](process: PN) {

  def >[To](to: To)
           (implicit target: CanBeProcessOutputTarget[To],
            redirectOutput: RedirectOutput[PN, To]): redirectOutput.Result = redirectOutput(to)
}

Here the result (redirectOutput.Result) is a path dependent type. This may work in some simple cases but have two serious issues:

The Aux pattern, used heavily in the shapeless library provides a nice pattern for fixing both problems. We start by defining a type class for describing the operation, in this case redirecting the output channel of a process:

trait RedirectOutput[PN <: ProcessNode[_, NotRedirected, _], To] {
    type Result <: ProcessNode[_, Redirected, _]

    def apply(process: PN, to: To)(implicit target: CanBeProcessOutputTarget[To]): Result
}

object RedirectOutput {
    type Aux[PN <: ProcessNode[_, NotRedirected, _], To, Result0] = 
        RedirectOutput[PN, To] { type Result = Result0 }

    // ... type class instances
}

The type class itself is straightforward. We have to implement it for both Process and PipedProcess and set the Result type accordingly, then implement apply that sets up the actual redirection. But what the Aux type is for?

It solves the problems with the path dependent version if we use it like this:

implicit class ProcessNodeOutputRedirect[
    IRS <: RedirectionState, 
    ERS <: RedirectionState, 
    PN <: ProcessNode[IRS, NotRedirected, ERS]](process: PN) {

  def >[To, Result <: ProcessNode[_, Redirected, _]](to: To)
           (implicit target: CanBeProcessOutputTarget[To],
            redirectOutput: RedirectOutput.Aux[PN, To, Result]): Result = redirectOutput(to)
}

By lifting the Result from the type class instance to a type parameter the compiler can now "extract" the calculated type from redirectOutput.Result to the > function's Result type parameter and use it directly, both for other further type requirements or as we do here, in the result type.

This is the basic pattern used for all the operations in prox. You can check Luigi's short introduction to the Aux pattern for a more detailed explanation.

Starting the processes

So far we just combined purely functional data structures in a complicated way. The result value may encode the launching of several system processes that are connected via pipes to each other and possibly other streams as we will see.

When we eventually decide to start these processes, we need a way to observe their status, wait for them to stop, get their exit code, and to access the data sent to the output streams if they were redirected. And we need this per process, while launching the whole process graph in a single step.

First let's model a single running process:

trait RunningProcess[Out, OutResult, ErrResult] {
    def isAlive: IO[Boolean]
    def waitForExit(): IO[ProcessResult[OutResult, ErrResult]]
    def terminate(): IO[ProcessResult[OutResult, ErrResult]]
}

and ProcessResult that represents an already terminated process:

case class ProcessResult[OutResult, ErrResult](
    exitCode: Int, 
    fullOutput: OutResult, 
    fullError: ErrResult
)

Now we need to define a start extension method on ProcessNode that returns somehow one well typed RunningProcess for each system process that it starts.

Let's forget for a second about having multiple processes piped together and just consider the single process case. For that, we would need somehing like this (the Out parameter is needed only for piping so I omitted it):

def start: IO[RunningProcess[OutResult, ErrResult]]

Now we can see why Process has those additional type paramters. It is not enough to encode whether the output and error channels were redirected or not, we also have to encode the expected result type of redirecting these. By storing these types in type parameters of Process we can easily imagine that by using the pattern described in the previous section, the result type can depend on what we redirected the process to.

Let's see some examples of what this means!

TargetResult type
A file system pathThe result type is Unit, the redirection happens on OS level
SinkThe result type is Unit, only the sink's side effect matters
Pipe with monoid elem typeThe stream is folded by the monoid, the result type is T
Pipe with non-monoid elem typeThe stream captures the elements in a vector, the result type is Vector[T]
Custom fold functionThe result type is the function's result type

The CanBeProcessOutputTarget type class we've seen earlier defines both the stream element type and the result type:

trait CanBeProcessOutputTarget[To] {
  /** Output stream element type */
  type Out
  /** Result type of running the output stream */
  type OutResult

  def apply(to: To): ProcessOutputTarget[Out, OutResult]
}

ProcessOutputTarget contains the actual IO code to build the redirection of the streams, I won't get into details in this post. Note that there are similar type classes for error and input redirection too.

For two processes piped together we have to provide two RunningProcess instances with the proper result type parameters. So we can see that it is not enough that the redirection stores the result type in the process type, the start method must be dependent typed too.

One way to encode this in the type system would be something like this (simplified):

val p1 = Process()
val p2 = Process()
val p3 = Process()

val rp1: IO[RunningProcess] = p1.start
val rp2: IO[(RunningProcess, RunningProcess)] = (p1 | p2).start
val rp3: IO[(RunningProcess, RunningProcess, RunningProcess)] = (p1 | p2 | p3).start

We encode piped processes with tuples of RunningProcess and single process with a single RunningProcess. To implement this we can make use of the shapeless library's HList implementation.

HLists are heterogeneous lists; basically similar to a tuple, but with all the "usual" list-like functions implemented as dependent typed functions. It's type describes the types of all its elements, and you can split it to head/tail, append two, etc. And we can do it both on the type level (computing the result type of appending two HList's, for example) and on the value leve (appending the two values creating a third HList value).

We can implement the start method more easily by building a HList, while still keep the desired interface as shapeless implements a conversion from HList to tuples.

We can define two separate start functions, one producing HList and another the tuples (IO releated parameters omitted):

def start[RP](implicit start: Start.Aux[PN, RP, _]]): IO[RP] = ???
def startHL[RPL <: HList](implicit start: Start.Aux[PN, _, RP[IO]): IO[RPL] = ???

The Start type class calculates both the tupled and the HList version's result type. The implementation's responsibility is to start the actual system processes and wire the streams together.

The interesting part is how we use type level calculations from shapeless to calculte the tuple and HList types for piped processes. This is all done using the technique I described earlier, but may look a bit shocking first. Let's take a look!

implicit def startPipedProcess[
  Out, Err,
  PN1 <: ProcessNode[_, _, _, _, _],
  PN2 <: ProcessNode[_, _, _, _, _],
  IRS <: RedirectionState, ORS <: RedirectionState, ERS <: RedirectionState,
  RP1, RPL1 <: HList, RP1Last <: RunningProcess[_, _, _],
  RP2, RPL2 <: HList, RP2Head <: RunningProcess[_, _, _], RP2Tail <: HList,
  RPT, RPL <: HList]
  (implicit
   start1: Start.Aux[PN1, RP1, RPL1],
   start2: Start.Aux[PN2, RP2, RPL2],
   last1: Last.Aux[RPL1, RP1Last],
   rp1LastType: RP1Last <:< RunningProcess[Byte, _, _],
   hcons2: IsHCons.Aux[RPL2, RP2Head, RP2Tail],
   prepend: Prepend.Aux[RPL1, RPL2, RPL],
   tupler: Tupler.Aux[RPL, RPT]):
  Aux[PipedProcess[Out, Err, Byte, PN1, PN2, IRS, ORS, ERS], RPT, RPL] =

    new Start[PipedProcess[Out, Err, Byte, PN1, PN2, IRS, ORS, ERS]] {
      override type RunningProcesses = RPT
      override type RunningProcessList = RPL

      // ...
    }

The way to parse this is to follow the type level computations performed through the Aux types in the implicit parameter list:

The compiler perform the type level calculations and we can use the result types RPT and RPL to actually implement the start typeclass. This is the most complicated type level calculation in the library.

Final thoughts

As we've seen, Scala's type system can bring us quite far in expressing a dependent typed interface. On the other hand writing and reading code in this style is really hard, and if things go wrong, decoding the compiler's error messages is not an easy task either. This is a serious tradeoff that has to be considered and in many cases a more dynamic but much more readable and maintainable approach can be better.

With prox I explicitly wanted to explore these features of the Scala language.

In the next posts we will ignore the type level parts of the library and focus on different streaming and effect libraries.