When an actor receives a message, the message is placed in a queue until the agent is ready to process it. When an agent process a message, it makes a decision about what to do with it based on its internal state and contents of the message. It might do processing, send a reply, send a new message for a different agent, etc.
F# provides the MailboxProcessor class for agent processing using Asynchronous Programming or more specific, Asynchronous Workflows.
Asynchronous Programming describes programs and operations that once started are executed in background and terminate at some later time.
Asynchronous Workflow allow you to perform asynchronous operations without the need for explicit callbacks. So you can write the code as if it were using synchronous execution, but actually, the code will execute asynchronously, supending and resuming as asynchronous operation complete.
In Asynchronous Programming, you want to avoid blocking of threads. Threads are expensive because allocate a 1 MB stack and other operational system implications.
In performance critical code, it is important to keep the number of blocked threads low. Ideally, you will only have as many as you have processors and you will have no blocked threads.
There are several topics related to the usage of MailboxProcessor. In this document I will analyze the handling of agent's persisted variables.
The code for the first agent example is:
let Act_RandomAccum= MailboxProcessor.Start(fun (i:MailboxProcessor) -> let rec loop state = async { let! r= i.Receive() let z=new Random() let v=z.NextDouble() printfn "state=%f" (state+v) return! loop(state+v)} loop 0.0) (* state=0.761920 state=1.613361 state=1.717945 state=1.912050 ... *)
Let's create an function that will run continuously and it will post messages to agents every 3 seconds (asynchronously). You can cancel it by executing Async.CancelDefaultToken().
let Act_clk= async { while true do do Act_RandomAccum.Post(1) //do Act_3States.Post(1) //do Act_3States_2.Post(1) do! Async.Sleep(3*1000) } let cancelHandler(ex : OperationCanceledException)= printfn "Task canceled" Async.TryCancelled(Act_clk, cancelHandler) |> Async.StartNotice how variables are keep by being a input to the loop.
This is not covinient when you want to preserve several states.
The next snippet uses a record to preserve several variables. Change the comment lines in Act_clk to send messages to the following agent.
type States= {mutable s1:float; mutable s2:float; mutable s3:float; mutable sum:float; mutable Status: string} let States1={s1=0.0;s2=0.0;s3=0.0;sum=0.0;Status="good"} let Act_Random3States= MailboxProcessor.Start(fun (i:MailboxProcessor) -> let rec loop (s:States) = async { let! r= i.Receive() let z=new Random() let v=z.NextDouble() s.s3<-s.s2; s.s2<-s.s1 s.s1<-v s.sum<-s.s1+s.s2+s.s3 printfn "states,sum=%f %f %f %f" s.s1 s.s2 s.s3 s.sum return! loop(s)} loop States1) (* states,sum=0.023424 0.000000 0.000000 0.023424 states,sum=0.046045 0.023424 0.000000 0.069468 states,sum=0.068666 0.046045 0.023424 0.138134 states,sum=0.568861 0.068666 0.046045 0.683572 states,sum=0.591482 0.568861 0.068666 1.229009 states,sum=0.614103 0.591482 0.568861 1.774447 states,sum=0.636724 0.614103 0.591482 1.842310 ... *)
The last example is more powerfull. The states are preserved in a class. Objects (as the random one) can be stored and don't need to be recreated in every execution.
Instance and static function can be defined (as fShift and fSum) to be used by the agent.
In other words, the agent defition is encapsulated in a class and its execution is done in the MailboxProcessor.
type StatesClass(s1:float,s2:float,s3:float,status:string)= let mutable _s1=s1 let mutable _s2=s2 let mutable _s3=s3 let mutable _status=status let r=new Random() let mutable sum= StatesClass.fSum _s1 _s2 _s3 member this.Sum with get()=sum and set x=sum<-x member this.S1 with get()= _s1 and set x=_s1<-x member this.S2 with get()= _s2 and set x=_s2<-x member this.S3 with get()= _s3 and set x=_s3<-x member this.Status with get()= _status and set x=_status<-x member this.rndObj=r static member fSum x y z=x+y+z member this.fShift()= this.S3<-this.S2; this.S2<-this.S1 let sc1=StatesClass(s1=0.0,s2=0.0,s3=0.0,status="ok") let Act_3States_2= MailboxProcessor.Start(fun (i:MailboxProcessor) -> let rec loop (s:StatesClass) = async { let! r= i.Receive() s.fShift() let v=s.rndObj.NextDouble() s.S1<-v s.Sum <- StatesClass.fSum s.S1 s.S2 s.S3 printfn "states,sum=%f %f %f %f" s.S1 s.S2 s.S3 s.Sum return! loop(s)} loop sc1) (* states,sum=0.396605 0.000000 0.000000 0.396605 states,sum=0.029249 0.396605 0.000000 0.425854 states,sum=0.197748 0.029249 0.396605 0.623602 states,sum=0.863481 0.197748 0.029249 1.090479 states,sum=0.641731 0.863481 0.197748 1.702961 states,sum=0.867453 0.641731 0.863481 2.372665 states,sum=0.880071 0.867453 0.641731 2.389254 ... *)