Creating the Processors¶
In this section we will define the processor objects required for processing summary files from our Maple pipeline. Recall that processors are the actual objects where pipeline support (that means summary file upload and statistics querying) is implemented. There are two processors we need to define: a runs processor to process data uploaded by a user and a stats processor to return statistics query.
The Runs Processor¶
First up is the runs processor. We mentioned earlier that it is meant for processing user uploads. This means extracting all the values we need into various record objects we have defined earlier. Additionally, Sentinel also requires that the raw contents of the file be saved into the database. The purpose of this is twofolds: to ensure duplicate files are not uploaded and to allow users to download their summary files again. Sentinel also requires you to save the extracted record objects into the database. This is the actual metrics that will be returned when a user queries for the metrics later on.
We will start with the general outline first and then define the functions required for processing the actual uploaded
run summary file. All of this will be in a file called MapleRunsProcessor.scala
package nl.lumc.sasc.sentinel.exts.maple
import scala.concurrent._
import org.bson.types.ObjectId
import org.json4s.JValue
import nl.lumc.sasc.sentinel.adapters._
import nl.lumc.sasc.sentinel.models.User
import nl.lumc.sasc.sentinel.processors.RunsProcessor
import nl.lumc.sasc.sentinel.utils.{ ValidatedJsonExtractor, MongodbAccessObject }
/**
* Example of a simple pipeline runs processor.
*
* @param mongo MongoDB access object.
*/
class MapleRunsProcessor(mongo: MongodbAccessObject)
extends RunsProcessor(mongo)
with ReadGroupsAdapter
with ValidatedJsonExtractor {
// Our code will be here
}
Our runs processor extends the RunsProcessor
abstract class that is instantiated by an object that provides access
to the database. It is further enriched by the ReadGroupsAdapter
trait since we want to process data both on the
sample and read group level, and the ValidatedJsonExtractor
since we want to parse and validate our incoming JSON.
It is important to note here that the RunsProcessor
abstract class extends FutureMixin
so we will need to define
an implicit ExecutionContext
as well.
Let’s now define some of the required abstract values and methods. We can start with two simple ones: the pipeline name and the pipeline schema. The pipeline name is how the pipeline would be used in URL parameters (that means it’s safest if we only use alphanumeric characters) and the pipeline schema is the resource path to the schema we defined earlier.
...
class MapleRunsProcessor(mongo: MongodbAccessObject)
extends RunsProcessor(mongo)
with ReadGroupsAdapter
with ValidatedJsonExtractor {
/** Exposed pipeline name. */
def pipelineName = "maple"
/** JSON schema for incoming summaries. */
def jsonSchemaUrls = Seq("/schema_examples/maple.json")
}
Next is to define the Future
execution contexts and the record objects. Aliasing the record objects in the runs
processor here is a requirement of both the RunsProcessor
abstract class and the ReadGroupsAdapter
trait.
It allows the generic methods in RunsProcessor
to work with our custom record types.
...
class MapleRunsProcessor(mongo: MongodbAccessObject)
extends RunsProcessor(mongo)
with ReadGroupsAdapter
with ValidatedJsonExtractor {
...
/** Implicit execution context. */
implicit private def context: ExecutionContext =
ExecutionContext.global
/** Run records container. */
type RunRecord = MapleRunRecord
/** Sample-level metrics container. */
type SampleRecord = MapleSampleRecord
/** Read group-level metrics container. */
type ReadGroupRecord = MapleReadGroupRecord
}
Now we want to define how the record objects can be created from a parsed JSON. The exact implementation of this part is completely up to you. You can do this with one function, two functions, or more. You can call the function anything you want (so long as it does not interfere with the any parent trait methods). Basically, the details will differ depending on the JSON file’s structure.
In our case, one way to do this is using the extractUnits
and a helper case class MapleUnits
which will contain
MapleSampleRecord
and MapleReadGroupRecord
objects. The implementation looks like this:
...
class MapleRunsProcessor(mongo: MongodbAccessObject)
extends RunsProcessor(mongo)
with ReadGroupsAdapter
with ValidatedJsonExtractor {
...
/** Helper case class for storing records. */
case class MapleUnits(
samples: Seq[MapleSampleRecord],
readGroups: Seq[MapleReadGroupRecord])
/**
* Extracts the raw summary JSON into samples and read groups containers.
*
* @param runJson Raw run summary JSON.
* @param uploaderId Username of the uploader.
* @param runId Database ID for the run record.
* @return Two sequences: one for sample data and the other for read group data.
*/
def extractUnits(runJson: JValue, uploaderId: String,
runId: ObjectId): = {
/** Name of the current run. */
val runName = (runJson \ "run_name").extractOpt[String]
/** Given the sample name, read group name, and JSON section of the read group, create a read group container. */
def makeReadGroup(sampleName: String, readGroupName: String, readGroupJson: JValue) =
MapleReadGroupRecord(
stats = MapleReadGroupStats(
nReadsInput = (readGroupJson \ "nReadsInput").extract[Long],
nReadsAligned = (readGroupJson \ "nReadsAligned").extract[Long]),
uploaderId = uploaderId,
runId = runId,
readGroupName = Option(readGroupName),
sampleName = Option(sampleName),
runName = runName)
/** Given the sample name and JSON section of the sample, create a sample container. */
def makeSample(sampleName: String, sampleJson: JValue) =
MapleSampleRecord(
stats = MapleSampleStats(nSnps = (sampleJson \ "nSnps").extract[Long]),
uploaderId, runId, Option(sampleName), runName)
/** Raw sample and read group containers. */
val parsed = (runJson \ "samples").extract[Map[String, JValue]].view
.map {
case (sampleName, sampleJson) =>
val sample = makeSample(sampleName, sampleJson)
val readGroups = (sampleJson \ "readGroups").extract[Map[String, JValue]]
.map { case (readGroupName, readGroupJson) => makeReadGroup(sampleName, readGroupName, readGroupJson) }
.toSeq
(sample, readGroups)
}.toSeq
MapleUnits(parsed.map(_._1), parsed.flatMap(_._2))
}
}
To be fair, that is still quite verbose and there are possibly other ways of doing it. As we mentioned, though, this depends largely on how your JSON file looks like. It is often the case as well that this is the most complex part of the code that you need to define.
The final part that we need to define is the actual function for processing the upload. This is where we combine all
functions we have defined earlier (and some that are already defined by the traits we are extending) in one place. It
covers the part after user upload up until the part where we create a RunRecord
object to be sent back to the user
as a JSON payload, notifying that the upload has been successful.
The function is called processRunUpload
and is a requirement of the RunsProcessor
abstract class. It has the
following signature:
/**
* Processes and stores the given uploaded file to the run records collection.
*
* @param contents Upload contents as a byte array.
* @param uploadName File name of the upload.
* @param uploader Uploader of the run summary file.
* @return A run record of the uploaded run summary file.
*/
def processRunUpload(
contents: Array[Byte],
uploadName: String,
uploader: User): Future[Perhaps[RunRecord]]
It is invoked by Sentinel’s RunsController
after user authentication (hence the uploader
argument). You do not
need to worry about uploadName
nor uploader
at this point. The important thing is to note the return type:
Future[Perhaps[RunRecord]]
. We have covered this in our earlier guides. This is where we now actually implement
a working code for processing the user upload.
There are of course several different ways to implement processRunUpload
. Here is one that we have, to give you an
idea:
class MapleRunsProcessor(mongo: MongodbAccessObject)
extends RunsProcessor(mongo)
with ReadGroupsAdapter
with ValidatedJsonExtractor {
...
def processRunUpload(contents: Array[Byte], uploadName: String, uploader: User) = {
val stack = for {
// Make sure it is JSON
runJson <- ? <~ extractAndValidateJson(contents)
// Store the raw file in our database
fileId <- ? <~ storeFile(contents, uploader, uploadName)
// Extract samples and read groups
units <- ? <~ extractUnits(runJson, uploader.id, fileId)
// Invoke store methods asynchronously
storeSamplesResult = storeSamples(units.samples)
storeReadGroupsResult = storeReadGroups(units.readGroups)
// Check that all store methods are successful
_ <- ? <~ storeReadGroupsResult
_ <- ? <~ storeSamplesResult
// Create run record
sampleIds = units.samples.map(_.dbId)
readGroupIds = units.readGroups.map(_.dbId)
run = MapleRunRecord(fileId, uploader.id, pipelineName, sampleIds, readGroupIds)
// Store run record into database
_ <- ? <~ storeRun(run)
} yield run
stack.run
}
}
Our implementation above consists of a series of functions; beginning with parsing and validating the JSON, storing
the raw uploaded bytes, extracting the record objects, and then storing the record objects. Everything is wrapped inside
EitherT[Future, ApiPayload, RunRecord]
, and stored as a value called stack
. This of course means we still need
to invoke the .run
method in order to get the Future[Perhaps[RunRecord]]
object which we will return.
We hope by now it is also clear to you that this single for comprehension block already has error handling with the
ApiPayload
type built in and that we always write to the database asynchronously whenever possible.
Here is our finished, complete MapleRunsProcessor
for your reference:
import scala.concurrent._
import org.bson.types.ObjectId
import org.json4s.JValue
import nl.lumc.sasc.sentinel.adapters._
import nl.lumc.sasc.sentinel.models.User
import nl.lumc.sasc.sentinel.processors.RunsProcessor
import nl.lumc.sasc.sentinel.utils.{ ValidatedJsonExtractor, MongodbAccessObject }
/**
* Example of a simple pipeline runs processor.
*
* @param mongo MongoDB access object.
*/
class MapleRunsProcessor(mongo: MongodbAccessObject)
extends RunsProcessor(mongo)
with ReadGroupsAdapter
with ValidatedJsonExtractor {
/** Exposed pipeline name. */
def pipelineName = "maple"
/** JSON schema for incoming summaries. */
def jsonSchemaUrls = Seq("/schema_examples/maple.json")
/** Run records container. */
type RunRecord = MapleRunRecord
/** Sample-level metrics container. */
type SampleRecord = MapleSampleRecord
/** Read group-level metrics container. */
type ReadGroupRecord = MapleReadGroupRecord
/** Execution context. */
implicit private def context: ExecutionContext = ExecutionContext.global
/** Helper case class for storing records. */
case class MapleUnits(
samples: Seq[MapleSampleRecord],
readGroups: Seq[MapleReadGroupRecord])
/**
* Extracts the raw summary JSON into samples and read groups containers.
*
* @param runJson Raw run summary JSON.
* @param uploaderId Username of the uploader.
* @param runId Database ID for the run record.
* @return Two sequences: one for sample data and the other for read group data.
*/
def extractUnits(runJson: JValue, uploaderId: String,
runId: ObjectId): MapleUnits = {
/** Name of the current run. */
val runName = (runJson \ "run_name").extractOpt[String]
/** Given the sample name, read group name, and JSON section of the read group, create a read group container. */
def makeReadGroup(sampleName: String, readGroupName: String, readGroupJson: JValue) =
MapleReadGroupRecord(
stats = MapleReadGroupStats(
nReadsInput = (readGroupJson \ "nReadsInput").extract[Long],
nReadsAligned = (readGroupJson \ "nReadsAligned").extract[Long]),
uploaderId = uploaderId,
runId = runId,
readGroupName = Option(readGroupName),
sampleName = Option(sampleName),
runName = runName)
/** Given the sample name and JSON section of the sample, create a sample container. */
def makeSample(sampleName: String, sampleJson: JValue) =
MapleSampleRecord(
stats = MapleSampleStats(nSnps = (sampleJson \ "nSnps").extract[Long]),
uploaderId, runId, Option(sampleName), runName)
/** Raw sample and read group containers. */
val parsed = (runJson \ "samples").extract[Map[String, JValue]].view
.map {
case (sampleName, sampleJson) =>
val sample = makeSample(sampleName, sampleJson)
val readGroups = (sampleJson \ "readGroups").extract[Map[String, JValue]]
.map { case (readGroupName, readGroupJson) => makeReadGroup(sampleName, readGroupName, readGroupJson) }
.toSeq
(sample, readGroups)
}.toSeq
MapleUnits(parsed.map(_._1), parsed.flatMap(_._2))
}
/**
* Validates and stores uploaded run summaries.
*
* @param contents Upload contents as a byte array.
* @param uploadName File name of the upload.
* @param uploader Uploader of the run summary file.
* @return A run record of the uploaded run summary file or a list of error messages.
*/
def processRunUpload(contents: Array[Byte], uploadName: String, uploader: User) = {
val stack = for {
// Make sure it is JSON
runJson <- ? <~ extractAndValidateJson(contents)
// Store the raw file in our database
fileId <- ? <~ storeFile(contents, uploader, uploadName)
// Extract samples and read groups
units <- ? <~ extractUnits(runJson, uploader.id, fileId)
// Invoke store methods asynchronously
storeSamplesResult = storeSamples(units.samples)
storeReadGroupsResult = storeReadGroups(units.readGroups)
// Check that all store methods are successful
_ <- ? <~ storeReadGroupsResult
_ <- ? <~ storeSamplesResult
// Create run record
sampleIds = units.samples.map(_.dbId)
readGroupIds = units.readGroups.map(_.dbId)
run = MapleRunRecord(fileId, uploader.id, pipelineName, sampleIds, readGroupIds)
// Store run record into database
_ <- ? <~ storeRun(run)
} yield run
stack.run
}
}
And that’s it! You now have fully-functioning runs processor.
The Stats Processor¶
The final step is defining the stats processor. This step will be relatively simpler than the inputs processor, since Sentinel now has a better idea of what to expect from the database records (courtesy of the record objects we defined earlier).
package nl.lumc.sasc.sentinel.exts.maple
class MapleStatsProcessor(mongo: MongodbAccessObject)
extends StatsProcessor(mongo) {
def pipelineName = "maple"
/* Function for retrieving Maple sample data points. */
def getMapleSampleStats =
getStats[MapleSampleStats]("stats")(AccLevel.Sample) _
/* Function for aggregating over Maple read group data points. */
def getMapleSampleAggrStats =
getAggregateStats[MapleSampleStatsAggr]("stats")(AccLevel.Sample) _
/** Function for retrieving Maple read group data points. */
def getMapleReadGroupStats =
getStats[MapleReadGroupStats]("stats")(AccLevel.ReadGroup) _
/* Function for aggregating over Maple read group data points. */
def getMapleReadGroupAggrStats =
getAggregateStats[MapleReadGroupStatsAggr]("stats")(AccLevel.ReadGroup) _
}
And that is all we need to have a fully functioning MapleStatsProcessor
. See also that what we are
doing here is defining functions partially (notice the use of _ at the end of each function, which is how we do
partial function invocations in Scala).
In addition to pipelineName
, which has the same meaning and use as the pipelineName
in MapleRunsProcessor
,
there are four functions we define here. These four functions calls two functions that are defined in the
StatsProcessor
abstract class: getStats
and getAggregateStats
.
Let’s take a look at getStats
invoked in getMapleSampleStats
first. Here we are calling it with one type
parameter, the MapleSampleStats
type. This is a type we have defined earlier to contain our sample-level metrics.
In essence this is what getStats
does. Upon user query, it creates sample-level metrics container objects from our
previously stored database records. It does that by reading the MapleSampleRecord
object and extracting an attribute
from that object whose type is MapleSampleStats
. getStats
is not smart enough to know which attribute that is,
however, so we need to supply the attribute name as an argument as well. In our case, this attribute is called
stats
and indeed we use stats
here as the first value argument to getStats
. The final argument
AccLevel.Sample
simply tells getStats
that we want this query to operate on the sample-level instead of read
group-level.
getStats
in getMapleReadGroupStats
is called with the same logic. The difference is only the final argument,
where we use AccLevel.ReadGroup
as we want this function to operate on the read group level.
getAggregateStats
is not exactly similar, but in this case are also called with the same logic. The main difference
is that the returned object is a single object containing various aggregated values.
With this, we have completely defined the required processors and internal data models. The next step is to expose these processors via the HTTP controllers.