HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Ported C# scheduling to Scala

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
scalaschedulingported

Problem

I'm taking a design workshop and we've received a short task:

  • To parse an input file of processor jobs.



  • To simulate scheduling of those jobs on a parallel computer (each job needs X processors)



  • The scheduling should be FCFS but it should be easy to to swap it out for a different scheduler.



I wrote the code in C# (in a few minutes) and decided this was a good opportunity to brush up on my now-two-years neglected Scala. Having done Haskell before, I feel that I'm not using Scala's capabilities. The whole code feels very stateful, and I'm sure there are things I'm complicating that could be much much simpler in idiomatic scala.

A bit about the structure: Since I'm simulating time but can't go through time since it's simulating a long time - I'm using a priority queue (on time) for my events. In order to avoid cases where I reschedule two events to the same time - I always promote unsuccessful schedules to the next done events. I also feel that the code could be quicker (takes 38 seconds on my laptop on the sample file).

I would really any improvement. To be fair this is a university assignment but asking on the internet on such sites was explicitly allowed.

```
import EventType.EventType
import scala.collection.mutable
import scala.io.Source

object Main {
def main(args: Array[String]) {
val sw = System.currentTimeMillis()
val lines = Source.fromFile( """c:\data\data.swf""").getLines()
val maxProcessors = lines.find(_.contains("MaxProcs")).get.split(":\\s+")(1).toInt
val data = lines.dropWhile(_.startsWith(";")).map(parseSWFJob)

val scheduler = new FCFSScheduler(numProcessors = maxProcessors)
data.map(scheduler.notify).flatMap(_ => scheduler.getDoneJobs).map(_.id).foreach(println) // pump
scheduler.noMoreJobs()
println(scheduler.getDoneJobs) // this writes to a file in the real code
println("Time diff millis" + (System.currentTimeMillis() - sw))
}
def parseSWFJob(str:String) = {
val xs = str.trim.split("\\s

Solution

Anything to do with time is inherently stateful but your code doesn't have to embrace statefulness quite, but Scala gives you some tools to address that.

Scala has simple construction of parameterized objects. You can save yourself some code (and some obscurity by using this. There is no reason, in class JobEvent, say, to copy eventJob into job.

One very useful state removal process you could use is to have your job dispatcher be a transformer. It takes a tuple (in a mathematical sense) of processors used, current time, jobs being processed, jobs waiting and jobs done, and transforms that it into a new tuple. What this does is allow you to use immutable data structures, and removes state from your operations.

abstract class ComputerSchedule(
  processorsUsed: Int, // number of processors occupied with jobs
  currentTime: Int,    // current simulated time
  inProgress: List[JobEvent], // list of jobs, ordered by ending time
  waiting: List[Job],  // list of jobs
  jobsDone: List[JobEvent]   // list of jobs, ordered by ending time
) {

  def isDone: Boolean = inProgress.empty && waiting.empty

  def nextState: ComputerSchedule
}


You then create a FcfsScheduler sub-class that implements the nextState method.

The transformation method finds the next (set of) JobEvents that are finishing from the inProgress queue, calculates the available number of processors, goes to the waiting queue to find suitable candidate jobs, creates JobEvents for each one and addes them to the newly constructed inProgress queue, depleting the waiting queue.

class FcfsScheduler extends ComputerSchedule {
  def nextState: ComputerSchedule = {
    // find all jobs that fit the number of processors available
    def findNextJobs(processorsAvailable: Int, candidates: List[Job]): (List[Job], List[Job]) =
      candidates match {
        case Nil => (Nil, Nil)
        case job :: tail if job.requiredProcessors 
          val (remainingJobs, remainingCandidates) = findNextJobs(processorsAvailable - job.requiredProcessors, tail)
          (job :: remainingJobs, remainingCandidates)
        case _ => (Nil, candidates)
     }
   // add to list by order of ending time
   def addActiveJob(j: JobEvent, jl: List[JobEvent]): List[JobEvent] = ???
   // add list of Jobs to inprocess list
   def addNextJobs(j: List[Job], jl: List[JobEvent]): List[JobEvent] = ???
   // and so on.
  }
}


Your simulator then consists of iterating scheduler.nextState until scheduler.isDone is true. Meanwhile, you have a complete history of your simulation.

Code Snippets

abstract class ComputerSchedule(
  processorsUsed: Int, // number of processors occupied with jobs
  currentTime: Int,    // current simulated time
  inProgress: List[JobEvent], // list of jobs, ordered by ending time
  waiting: List[Job],  // list of jobs
  jobsDone: List[JobEvent]   // list of jobs, ordered by ending time
) {

  def isDone: Boolean = inProgress.empty && waiting.empty

  def nextState: ComputerSchedule
}
class FcfsScheduler extends ComputerSchedule {
  def nextState: ComputerSchedule = {
    // find all jobs that fit the number of processors available
    def findNextJobs(processorsAvailable: Int, candidates: List[Job]): (List[Job], List[Job]) =
      candidates match {
        case Nil => (Nil, Nil)
        case job :: tail if job.requiredProcessors <= processorsAvailable =>
          val (remainingJobs, remainingCandidates) = findNextJobs(processorsAvailable - job.requiredProcessors, tail)
          (job :: remainingJobs, remainingCandidates)
        case _ => (Nil, candidates)
     }
   // add to list by order of ending time
   def addActiveJob(j: JobEvent, jl: List[JobEvent]): List[JobEvent] = ???
   // add list of Jobs to inprocess list
   def addNextJobs(j: List[Job], jl: List[JobEvent]): List[JobEvent] = ???
   // and so on.
  }
}

Context

StackExchange Code Review Q#69291, answer score: 2

Revisions (0)

No revisions yet.