Asynchronous Processing

Having dealt with handling expected errors, we will now take a short tour on how Sentinel does asynchronous processing. The basic rationale of using asynchronous processing in Sentinel is to ensure that multiple HTTP requests can be handled at the same time. A single HTTP request may involve writing and/or reading from the database several times, so most asynchronous functions in Sentinel are database-related operations.

Note

This guide assumes readers are already familiar with the Future type and attempts only to explain how it is used in Sentinel. For more comprehensive Future guide, we find the official overview a good starting point.

Sentinel uses the Future type defined in the standard library for its asynchronous functions. In some ways, this allows us to leverage the type system to make our functions composable, similar to how we handled errors earlier using the disjunction type \/.

Future with for comprehensions

Like the Option, Either, and \/ types we have seen previously, Future is composable. You can, for example, use it inside a for comprehension:

import scala.concurrent._

// Remember that we need an execution context
implicit val context = ExecutionContext.global

def calc1(): Future[Int] = Future { ... }
def calc2(): Future[Int] = Future { ... }
def calc3(x: Int, y: Int) = Future[Int] = Future { ... }

// Launch separate computation threads
val value1 = calc1()
val value2 = calc2()

val finalResult = for {
    firstResult <- value1
    secondResult <- value2
    thirdResult <- calc3(firstResult, secondResult)
} yield thirdResult

The code block above will start a computation for value1 and value2 in parallel threads and wait until they both return their results, before using them as arguments for calc3.

The code also illustrates how using Future with for comprehensions require a little more care. Notice that we invoked calc1 and calc2 outside of the for comprehension block. This is intentional and the reason is because function calls inside the block are sequential. Had we written finalResult like this:

...

val finalResult = for {
    firstResult <- calc1()
    secondResult <- calc2()
    thirdResult <- calc3(firstResult, secondResult)
} yield thirdResult

then it does not matter if we write our code inside a Future. It will be invoked sequentially, defeating the purpose of using Future in the first place.

In some cases the for comprehension does allow for a value declaration inside itself (i.e. using the = operator instead of <-). This requires that the first statement inside the block is an <- statement using the abstract type the block intends to return. Since we are using Future, this means the first statement should be a <- statement from a Future[_] type. Take the following example:

...

def calc0(): Future[Unit] = Future { ... }
def calc1(): Future[Int] = Future { ... }
def calc2(): Future[Int] = Future { ... }
def calc3(x: Int, y: Int) = Future[Int] = Future { ... }

val finalResult = for {
    preResult <- calc0()
    // Here ``calc1`` and ``calc2`` gets executed asynchronously
    value1 = calc1()
    value2 = calc2()
    firstResult <- value1
    secondResult <- value2
    thirdResult <- calc3(firstResult, secondResult)
} yield thirdResult

The code block above also computes value1 and value2 asynchronously, similar to our first example.

Combining Future and Perhaps

In Sentinel, Future is often combined with the Perhaps type we have defined earlier. Conceptually, this means that there are cases where Sentinel invokes a function asynchronously that may or may not return its expected type. A function with the following signature, for example:

def storeUpload(contents: Array[Byte]): Future[Perhaps[DatabaseId]] = { ... }

is expected to be executed asynchronously. In this case, it is a function to store user uploads which will return the database ID of the stored file. There could be different reasons of wrapping the database ID inside Perhaps. One is that we may want to tell users when they are uploading files they have previously uploaded, so we can save disk space and the user do not store the same data twice.

Naturally there are still cases where we do not need our results being wrapped inside Future, or Perhaps, or even both.

Consider our earlier extractJson function. This is a function that we expect to execute very early upon user upload. Does it make sense to wrap it inside a Future? It depends on how you setup your processing of course. But it is easy to imagine that we first want to ensure that the data that the user uploads can indeed be processed into JSON first before doing anything else. In this case, we would only need to wrap the return value inside a Perhaps and not a Future since no other processing would be done in parallel at the time we are doing validation.

On the other hand, methods that interact with the database directly are often wrapped only inside a Future and not Perhaps. An example would be a function storing sample data parsed from the JSON record:

def storeSamples(samples: Seq[Samples]): Future[Unit] = { ... }

This is the case because in most cases we do not expect database connection failures to be something the user can recover from, so there is little point in letting them know this. We should anticipate indeed that the database connection from time to time may fail, but this is something that should only be displayed in the server logs and not to the user, so we do not use Perhaps here.

Tip

For asynchronous methods where the error is not something the user can work on, we should let the Future fail. There is a built-in check in Sentinel that captures these Future failures and then converts it to a general HTTP 500 Internal Server Error to the user.

The fact that not all methods return Future, or Perhaps, or even Future[Perhaps] is something we need to take into account when composing these functions. We saw earlier that we can use for comprehensions for a series of calls that all return Perhaps, or a series of calls that all return Future in some form.

This is not the case when composing functions of these different abstract types, however. Let’s say we have these functions that we wish to compose into a single processing step:

def extractMetrics(contents: Array[Byte]): Perhaps[Metrics] = { ... }
def storeMetrics(metrics: Metrics): Future[DatabaseId] = { ... }
def storeUpload(contents: Byte): Future[Perhaps[DatabaseId] = { ... }

Then this will not work because they all return different types:

def processUpload(contents: Array[Byte]) = {
    metrics <- extractMetrics(contents)
    metricsId <- storeMetrics(metrics)
    fileId <- storeUpload(metrics)
} yield (metricsId, fileId)

Important

Not only will the code above not compile, but we are also launching the all Future sequentially.

A solution is to make these functions return the same type. It does not necessarily mean we have to change the functions themselves. After all, we have seen that we can not force all functions to use Future or Perhaps. Not only is this conceptually wrong, it is also impractical to expect all functions we write to use a similar abstract type. What we want is to somehow ‘lift’ the return values of the functions into a common return type, but only when we want to compose them. This way, functions can remain as they are yet can still be composed with others when needed.

Lifting Into Future[Perhaps]]

What should we use then as the common type? A good candidate is actually a function with a Future[Perhaps[T]]] type. This type can be interpreted as types whose value are computed asynchronously with a possibility of returning an ApiPayload to be displayed to the user. Recall that in our case, Perhaps[T] is an alias for the disjunction type ApiPayload \/ T. Future[Perhaps[T]] is actually then an alias for Future[ApiPayload \/ T].

Note

Why not Perhaps[Future[T]] instead? Since this type is an alias for ApiPayload \/ Future[T], we can interpret it as types whose value when failing is ApiPayload and when successful is an asynchronous computation returning T. In other words, it encodes the types whose error value is not computed asynchronously. This distinction does not really make sense in practice. It sort of means that we only do the asynchronous computation when we know we will not get any failures, but we could not have known this prior to the computation itself.

How do we lift into Future[Perhaps]]? There are two cases we need to consider, lifting from Perhaps types and lifting from Future types. From Perhaps types, we can simply wrap it using Future.successful. The function, provided by the standard library, is precisely for lifting types into a Future without launching any parallel computation thread which Future normally does. In other words, it creates a completed Future without the unecessary work of launching the Future.

Future.successful can be used like so:

// A simple call to our ``extractMetrics`` function
val metrics: Perhaps[Metrics] = extractMetrics(...)

// Lifting the call into a ``Future``
val futureMetrics: Future[Perhaps[Metrics]] =
    Future.successful(extractMetrics(...))

For the second case of lifting Future into Future[Perhaps], we can simply use Future.map to lift the results inside it. Essentially, we then only lift the inner type of Future into Perhaps:

// A call to launch our ``storeMetrics`` function
val dbId: Future[DatabaseId] = storeMetrics(...)

// Lifting the ``DatabaseId`` to ``Perhaps[DatabaseId]``
val futureDbId: Future[Perhaps[DatabaseId]] =
    dbId.map(id => \/-(id))

Now, having covered both cases, let’s make our earlier for comprehension work. We will also define UploadIds, a helper case class for storing our uploaded IDs.

// Helper case class for storing uploaded IDs
case class UploadId(metrics: DatabaseId, file: DatabaseId)

def processUpload(contents: Array[Byte]): Future[Perhaps[UploadId]] = {
    // Here we lift from ``Perhaps``
    metrics <- Future.successful(extractMetrics(contents))
    // Here we lift from ``Future``. Remember
    // that we want to store asynchronously,
    // so we need to launch the computation as
    // value declarations
    asyncMetricsId = storeMetrics(metrics).map(id => \/-(id))
    asyncFileId = storeUpload(metrics)
    // This is where we actually wait for the
    // store methods in parallel
    metricsId <- asyncMetricsId
    res = UploadId(metricsId, fileId)
} yield res

val uploadResult: Future[Perhaps[UploadId]] = processUpload(...)

Does the code above work? Not quite. It has to do with the fact that we are now using two layers of abstract types, Future and Perhaps. This means that in this line:

...
    // Here we lift from ``Perhaps``
    metrics <- Future.successful(extractMetrics(contents))

metrics does not evaluate to a Metrics object, but instead to a Perhaps[Metrics] object. The for comprehension unwraps only the Future and not Perhaps. Consequently, we can not use metrics directly as an argument for storeMetrics afterwards. We have to unwrap it first, for example:

...
    // Here we lift from ``Perhaps``
    metrics <- Future.successful(extractMetrics(contents))
    asyncMetricsId = metrics match {
        // when metrics is an error type, we still
        // need to wrap it inside a ``Future``
        case -\/(err) => Future.successful(err)
        case \/-(ok)  => storeMetrics(ok).map(id => \/-(id))
    }

Not only is this too verbose, but it also reads quite unintuitively. Consider also that this is only for one part of the statement. We have to unwrap subsequent statements to make sure Perhaps is also unwrapped. Surely we can do better than this? Indeed we can. The answer lies in another type defined in scalaz, called the EitherT type.

Scalaz’s EitherT

EitherT is a type meant to be used when the disjunction type \/ is wrapped inside some other abstract types (in our case, a Future). Not all abstract types can be used here, and Future itself needs a little bit of enhancement.

Tip

The mathematical term for EitherT is a monad transformer, since \/ is a monad, Future can be made into a monad, and EitherT transforms them both into another monad that is a combination of both. We are intentionally not using these terms here, but they are actually common abstractions that pop up here and there in various codebases. Plenty of tutorials and guides about monads can be found online. If you are interested in monad transformers in Scala in particular, we found the guide here and the guide here as good starting points.

What can we do with EitherT? Essentially it boils down to this: EitherT allows us to unwrap both Future and Perhaps in a single for comprehension statement by wrapping them into another type that combines both Future and Perhaps. The new type, in our case, is called EitherT[Future, ApiPayload, T]. It is not Future[Perhaps]], but it needs to be made from Future[Perhaps].

// Alias for the new type, let's call it AsyncPerhaps
val AsyncPerhaps[+T] = EitherT[Future, ApiPayload, T]

// From a ``Future[Perhaps]``
val val1: Future[Perhaps[Int]] = ...
val lifted1: AsyncPerhaps[Int] = EitherT(v)

// From a ``Future``
val val2: Future[Int] =
val lifted2: AsyncPerhaps[Int] = EitherT.right(val2)

// From a ``Perhaps``
val val3: Perhaps[Int] = ...
val lifted3: AsyncPerhaps[T] = EitherT(Future.successful(val3))

// Even from an arbitrary type
val val4: Int = ...
val lifted4: AsyncPerhaps[T] = val.point[AsyncPerhaps]

Notice that EitherT can be used directly on values with a Future[Perhaps]] type in our case. For Perhaps values, we need to wrap it inside a Future still. For Future methods, we use a helper method in EitherT that essentially maps the inner type with Perhaps (essentially what we did earlier when we did Future.map manually). In short, we still needed to lift our types into Future[Perhaps] first.

Using EitherT, our previous iteration then becomes this:

...

def processUpload(contents: Array[Byte]) = {
    metrics <- EitherT(Future.successful(extractMetrics(contents)))
    asyncMetricsId = storeMetrics(metrics)
    asyncFileId = storeUpload(metrics)
    metricsId <- EitherT.right(asyncMetricsId)
    res = UploadId(metricsId, fileId)
} yield res

val wrappedResult: EitherT[Future, ApiPayload, UploadResult] =
    processUpload(...)

There is only one thing left to do, which is to unwrap back wrappedResult. Our EitherT type can be considered a helper type that allows us to compose all our functions. The type that is actually useful for us outside of the for comprehension, though, is the Future[Perhaps] type, since our for comprehensions combines both Future and Perhaps already. We can convert EitherT[Future, ApiPayload, UploadResult back to Future[Perhaps[UploadResult] by invoking the .run method:

...

def processUpload(contents: Array[Byte]) = {
    metrics <- EitherT(Future.successful(extractMetrics(contents)))
    asyncMetricsId = storeMetrics(metrics)
    asyncFileId = storeUpload(metrics)
    metricsId <- EitherT.right(asyncMetricsId)
    res = UploadId(metricsId, fileId)
} yield res

val finalResult: Future[Perhaps[UploadResult]] =
    processUpload(...).run

And that’s it. We now have combined Future and Perhaps into a single block of computation. EitherT has definitely improved readablity since it spares us from the need to unwrap manually. We are not completely done yet, however. There are some implicit values required by future (its ExecutionContext) and some implicit methods required by scalaz to make this work. The details can be ignored for our discussion. What’s important to know is that these are all already defined in the FutureMixin trait in the nl.lumc.sasc.sentinel.utils package.

Sentinel’s FutureMixin

There are two things that this trait does that helps you combine Future and Perhaps:

  1. It defines all the necessary implicit methods to make Future suitable for EitherT.
  2. It defines an object called ? that you can use to make your for comprehension even more compact.

We will discuss the implicit methods here, but we would like to note the ? object. Recall that our last iteration of the processUpload function is like this:

...

def processUpload(contents: Array[Byte]) = {
    metrics <- EitherT(Future.successful(extractMetrics(contents)))
    asyncMetricsId = storeMetrics(metrics)
    asyncFileId = storeUpload(metrics)
    metricsId <- EitherT.right(asyncMetricsId)
    res = UploadId(metricsId, fileId)
} yield res

The ? object defines all EitherT calls necessary for lifting our type into a function called <~. The names are not exactly pronounceable, but for good reason. Since You can omit the dot (.) and parentheses (( and )) when calling an object’s function, you can then do this with the ? object:

...

// This must be done in an object extending the
// `FutureMixin` trait now.
def processUpload(contents: Array[Byte]) = {
    metrics <- ? <~ extractMetrics(contents)
    asyncMetricsId = storeMetrics(metrics)
    asyncFileId = storeUpload(metrics)
    metricsId <- ? <~ asyncMetricsId
    res = UploadId(metricsId, fileId)
} yield res

Notice there that we don’t need to call EitherT manually again. For the first statement, for example, we are doing:

...

    metrics <- ? <~ extractMetrics(contents)

Which is essentially the same as:

...

    metrics <- ?.<~(extractMetrics(contents))

Some would still prefer to user EitherT here, and that is fine. The ? object is simply there to give you the option to shorten your EitherT instantiations by leveraging Scala’s features.

Next Steps

We have covered a lot now! Now you should be ready to implement all the things we have learned in a real Sentinel Processor. Head over to the next section to do just that.