Computer Science Experimentation

Monday, September 27, 2010

F# parallel programming using Agents

Consider a parallel program as a series of independent components that send and receive messages.This is referred as the agent model.

When an actor receives a message, the message is placed in a queue until the agent is ready to process it. When an agent process a message, it makes a decision about what to do with it based on its internal state and contents of the message. It might do processing, send a reply, send a new message for a different agent, etc.

F# provides the MailboxProcessor class for agent processing using Asynchronous Programming or more specific, Asynchronous Workflows.

Asynchronous Programming describes programs and operations that once started are executed in background and terminate at some later time.

Asynchronous Workflow allow you to perform asynchronous operations without the need for explicit callbacks. So you can write the code as if it were using synchronous execution, but actually, the code will execute asynchronously, supending and resuming as asynchronous operation complete.

In Asynchronous Programming, you want to avoid blocking of threads. Threads are expensive because allocate a 1 MB stack and other operational system implications.
In performance critical code, it is important to keep the number of blocked threads low. Ideally, you will only have as many as you have processors and you will have no blocked threads.

There are several topics related to the usage of MailboxProcessor. In this document I will analyze the handling of agent's persisted variables.

The code for the first agent example is:

let Act_RandomAccum= 
    MailboxProcessor.Start(fun (i:MailboxProcessor) ->
        let rec loop state =
            async {
                    let! r= i.Receive()
                    let z=new Random()
                    let v=z.NextDouble()
                    printfn "state=%f" (state+v)
                    return! loop(state+v)}
        loop 0.0)
(*
state=0.761920
state=1.613361
state=1.717945
state=1.912050
...
*)

Let's create an function that will run continuously and it will post messages to agents every 3 seconds (asynchronously). You can cancel it by executing Async.CancelDefaultToken().

let Act_clk=
    async {
            while true do
                do Act_RandomAccum.Post(1)
                //do Act_3States.Post(1)
                //do Act_3States_2.Post(1)
                do! Async.Sleep(3*1000)
           }

let cancelHandler(ex : OperationCanceledException)=
    printfn "Task canceled"

Async.TryCancelled(Act_clk, cancelHandler) 
|> Async.Start

Notice how variables are keep by being a input to the loop.
This is not covinient when you want to preserve several states.

The next snippet uses a record to preserve several variables. Change the comment lines in Act_clk to send messages to the following agent.

type States= {mutable s1:float; mutable s2:float; mutable s3:float; mutable sum:float; mutable Status: string}

let States1={s1=0.0;s2=0.0;s3=0.0;sum=0.0;Status="good"}

let Act_Random3States= 
    MailboxProcessor.Start(fun (i:MailboxProcessor) ->
        let rec loop (s:States) =
            async {
                    let! r= i.Receive()
                    let z=new Random()
                    let v=z.NextDouble()
                    s.s3<-s.s2; s.s2<-s.s1
                    s.s1<-v
                    s.sum<-s.s1+s.s2+s.s3
                    printfn "states,sum=%f %f %f %f" s.s1 s.s2 s.s3 s.sum
                    return! loop(s)}
        loop States1)
(* 
states,sum=0.023424 0.000000 0.000000 0.023424
states,sum=0.046045 0.023424 0.000000 0.069468
states,sum=0.068666 0.046045 0.023424 0.138134
states,sum=0.568861 0.068666 0.046045 0.683572
states,sum=0.591482 0.568861 0.068666 1.229009
states,sum=0.614103 0.591482 0.568861 1.774447
states,sum=0.636724 0.614103 0.591482 1.842310
...
*)

The last example is more powerfull. The states are preserved in a class. Objects (as the random one) can be stored and don't need to be recreated in every execution.
Instance and static function can be defined (as fShift and fSum) to be used by the agent.
In other words, the agent defition is encapsulated in a class and its execution is done in the MailboxProcessor.

type StatesClass(s1:float,s2:float,s3:float,status:string)=
    let mutable _s1=s1 
    let mutable _s2=s2 
    let mutable _s3=s3 
    let mutable _status=status 
    let r=new Random()
    let mutable sum= StatesClass.fSum _s1 _s2 _s3  

    member this.Sum with get()=sum and set x=sum<-x
    member this.S1 with get()= _s1 and set x=_s1<-x
    member this.S2 with get()= _s2 and set x=_s2<-x
    member this.S3 with get()= _s3 and set x=_s3<-x
    member this.Status with get()= _status and set x=_status<-x
    member this.rndObj=r    
    static member fSum x y z=x+y+z
    member this.fShift()=
        this.S3<-this.S2; this.S2<-this.S1

let sc1=StatesClass(s1=0.0,s2=0.0,s3=0.0,status="ok")

let Act_3States_2= 
    MailboxProcessor.Start(fun (i:MailboxProcessor) ->
        let rec loop (s:StatesClass) =
            async {
                    let! r= i.Receive()
                    s.fShift()
                    let v=s.rndObj.NextDouble()
                    s.S1<-v
                    s.Sum <- StatesClass.fSum s.S1 s.S2 s.S3
                    printfn "states,sum=%f %f %f %f" s.S1 s.S2 s.S3 s.Sum
                    return! loop(s)}
        loop sc1)
(*
states,sum=0.396605 0.000000 0.000000 0.396605
states,sum=0.029249 0.396605 0.000000 0.425854
states,sum=0.197748 0.029249 0.396605 0.623602
states,sum=0.863481 0.197748 0.029249 1.090479
states,sum=0.641731 0.863481 0.197748 1.702961
states,sum=0.867453 0.641731 0.863481 2.372665
states,sum=0.880071 0.867453 0.641731 2.389254
...
*)

No comments:

Post a Comment