LAgent: an agent framework in F# – Part X – ActiveObject - Luca Bolognese

LAgent: an agent framework in F# – Part X – ActiveObject

Luca -

☕ 7 min. read

Download frame­work here.

All posts are here:

If you stare long enough at agents, you start to re­al­ize that they are just glorified locks’. They are a con­ve­nient pro­gram­ming model to pro­tect a re­source from con­cur­rent ac­cess. The pro­gram­ming model is con­ve­nient be­cause both the client and the server can write their code with­out wor­ry­ing about con­cur­rency prob­lems, and yet the pro­gram runs in par­al­lel. Protecting a re­source sounds a lot like state en­cap­su­la­tion and the con­cept of state en­cap­su­la­tion is what ob­ject ori­en­ta­tion is all about.

So you start think­ing if there is a way to en­hance vanilla ob­jects to make them agents. You want to reuse all the con­cepts that you are fa­mil­iar with (i.e. in­her­i­tance, vis­i­bil­ity rules, etc…) and you want your clients to call agents as if they were call­ing nor­mal ob­jects. Obviously, un­der the cover, the method calls won’t ex­e­cute im­me­di­ately, but they would be queued. Let’s look at an ex­am­ple.

This is our sim­ple counter agent:

type CounterMessage =
| Add of int
| Print
let counterF = fun msg count ->
    match msg with
    | Add(i)    -> count + i
    | Print     -> printfn "The value is %i" count; count
let c1 = spawnAgent counterF 0
c1 <-- Add(3)
c1 <—Print

As nice as this looks, there are un­fa­mil­iar things in this model:

  1. The com­mu­ni­ca­tion is through mes­sages. This re­quires pack­ing and un­pack­ing which, al­beit easy in F#, is un­fa­mil­iar and feels like ma­chin­ery that we’d like to get rid off.
  2. The man­age­ment of state is bizarre, it gets passed into the lambda and re­turned from it in­stead of be­ing rep­re­sented as fields and prop­er­ties on the agent

My best at­tempt at cre­at­ing an ob­ject-like syn­tax fol­lows:

type Counter() =
    let w = new WorkQueue()
    let mutable count = 0
    member c.Add x = w.Queue (fun () ->
        count <- count + x
        )
    member c.Print () = w.Queue (fun () ->
        printfn "The value is %i" count
        )
let c = new Counter()
c.Add 3
c.Print

With this syn­tax, you write your agents like you write your vanilla classes ex­cept:

  1. You need a pri­vate field of type WorkQueue
  2. You need to write your meth­ods as lamb­das passed to the WorkQueue.Queue func­tion
  3. Your meth­ods can­not re­turn val­ues

The most wor­ri­some of these con­straints is 2. be­cause you can eas­ily for­get about it. If you do for­get, then every­thing com­piles just fine, but it does­n’t do what you ex­pect. That’s pure bad­ness. I haven’t found a way to en­force it. This is a place where the lan­guage could help me. Other than that, the whole model works rather nicely.

Regarding the third point, you can con­coct a pro­gram­ming model that al­lows you to re­turn val­ues from your meth­ods. Here it is:

member c.CountTask = w.QueueWithTask(fun () ->
    count
    )
member c.CountAsync = w.QueueWithAsync(fun () ->
    count
    )
printfn "The count using Task is %i" (c.CountTask.Result)

The first method re­turns a Task; the sec­ond method re­turns an AsyncResultCell. Both are ways to rep­re­sent a promise. The lat­ter al­lows a nat­ural in­te­gra­tion with the async block in F# as in the fol­low­ing code:

Async.RunSynchronously (
            async {
                let! count = c.CountAsync
                printfn "The countusing Async is %i" count
            })

As for my­self, I don’t like meth­ods re­turn­ing val­ues. Every time I use them, I end up go­ing back and think­ing about my prob­lem in a tra­di­tional way, aka as method calls that re­turn re­sults, in­stead of think­ing about it in a more ac­tor ori­ented fash­ion. I end up wait­ing for these promises to be ma­te­ri­al­ized and, by do­ing so, I limit the amount of par­al­lelism that I un­leash. As a mat­ter of fact, the whole busi­ness of hid­ing the mes­sage pass­ing na­ture of the pro­gram­ming model is du­bi­ous. It makes for a nicer syn­tax, but you need to make an ex­tra ef­fort in your mind to trans­late it to what it re­ally is: just mes­sage pass­ing with a nice syn­tac­ti­cal ve­neer. I haven’t de­cided yet which model I like the most.

You should have a sense of what WorkQueue is. In essence, it is a Mailbox of lamb­das (look at the red bold code be­low).

type WorkQueue() =
    let workQueue = spawnWorker (fun f -> f())
    member w.Queue (f) = workQueue <-- f
    member w.QueueWithTask f : Task<'T> =
        let source = new TaskCompletionSource<_>()
        workQueue <-- (fun () -> f() |> source.SetResult)
        source.Task
    member w.QueueWithAsync (f:unit -> 'T) : Async<'T> =
        let result = new AsyncResultCell<'T>()
        workQueue <-- (fun () -> f() |> result.RegisterResult )
        result.AsyncWaitResult
    member w.Restart () = workQueue <-! Restart
    member w.Stop () = workQueue <-! Stop
    member w.SetErrorHandler(h) =
        let managerF = fun (_, name:string, ex:Exception, _, _, _) -> h name ex
        let manager = spawnWorker managerF
        workQueue <-! SetManager manager
    member w.SetName(name) = workQueue <-! SetName(name)
    member w.SetQueueHandler(g) = workQueue <-! SetWorkerHandler(g)
    member w.SetTimeoutHandler(timeout, f) = workQueue <-! SetTimeoutHandler(timeout, f)

I im­ple­mented all the ser­vices that are in the mes­sage pass­ing model. The two are equiv­a­lent as ex­press­ing power goes. In case you won­der how a real piece of code looks like us­ing this model, here is an ActiveObject ver­sion of the map re­duce al­go­rithm. One of these days, I will gather the strength to go trough this code and ex­plain what it does, but not to­day 🙂

#load "AgentSystem.fs"
open AgentSystem.LAgent
open System
open System.Collections
open System.Collections.Generic
open System.Threading
type IOutput<'out_key, 'out_value> =
    abstract Reduced: 'out_key -> seq<'out_value> -> unit
    abstract MapReduceDone: unit -> unit
type Mapper<'in_key, 'in_value, 'my_out_key, 'out_value when 'my_out_key : comparison>
(map:'in_key -> 'in_value -> seq<'my_out_key * 'out_value>, i, partitionF) = let w = new WorkQueue() let mutable reducerTracker: BitArray = null let mutable controller = Unchecked.defaultof<Controller<'in_key, 'in_value, 'my_out_key, 'out_value>> let mutable reducers = Unchecked.defaultof<Reducer<'in_key, 'in_value, 'my_out_key, 'out_value> array> member m.Init c reds = w.Queue (fun () -> controller <- c reducers <- reds reducerTracker <- new BitArray(reducers.Length, false)) member m.Process inKey inValue = w.Queue (fun () -> let outKeyValues = map inKey inValue outKeyValues |> Seq.iter (fun (outKey, outValue) -> let reducerUsed = partitionF outKey (reducers.Length) reducerTracker.Set(reducerUsed, true) reducers.[reducerUsed].Add(outKey, outValue))) member m.Done () = w.Queue (fun () -> controller.MapDone i reducerTracker) member m.Stop () = w.Stop () and Reducer<'in_key, 'in_value, 'out_key, 'out_value when 'out_key :
comparison>(reduce:'out_key -> seq<'out_value> -> seq<'out_value>, i, output:IOutput<'out_key, 'out_value>) = let w = new WorkQueue() let mutable workItems = new List<'out_key * 'out_value>() let mutable controller = Unchecked.defaultof<Controller<'in_key, 'in_value, 'out_key, 'out_value>> member r.Init c = w.Queue (fun () -> controller <- c) member r.StartReduction () = w.Queue (fun () -> workItems |> Seq.groupBy fst |> Seq.sortBy fst |> Seq.map (fun (key, values) -> (key, reduce key (values |> Seq.map snd))) |> Seq.iter (fun (key, value) -> output.Reduced key value) controller.ReductionDone i) member r.Add (outKey:'out_key, outValue:'out_value) : unit = w.Queue (fun () -> workItems.Add((outKey, outValue))) member m.Stop () = w.Stop () and Controller<'in_key, 'in_value, 'out_key, 'out_value when 'out_key : comparison>(output:IOutput<'out_key, 'out_value>) = let w = new WorkQueue() let mutable mapperTracker: BitArray = null let mutable reducerUsedByMappers: BitArray = null let mutable reducerDone: BitArray = null let mutable mappers = Unchecked.defaultof<Mapper<'in_key, 'in_value, 'out_key, 'out_value> array> let mutable reducers = Unchecked.defaultof<Reducer<'in_key, 'in_value, 'out_key, 'out_value> array> let BAtoSeq (b:BitArray) = [for x in b do yield x] member c.Init maps reds = w.Queue (fun () -> mappers <- maps reducers <- reds mapperTracker <- new BitArray(mappers.Length, false) reducerUsedByMappers <- new BitArray(reducers.Length, false) reducerDone <- new BitArray(reducers.Length, false)) member c.MapDone (i : int) (reducerTracker : BitArray) : unit = w.Queue (fun () -> mapperTracker.Set(i, true) let reducerUsedByMappers = reducerUsedByMappers.Or(reducerTracker) if not( BAtoSeq mapperTracker |> Seq.exists(fun bit -> bit = false)) then BAtoSeq reducerUsedByMappers |> Seq.iteri (fun i r -> if r = true then reducers.[i].StartReduction ()) mappers |> Seq.iter (fun m -> m.Stop ()) ) member c.ReductionDone (i: int) : unit = w.Queue (fun () -> reducerDone.Set(i, true) if BAtoSeq reducerDone |> Seq.forall2 (fun x y -> x = y) (BAtoSeq reducerUsedByMappers) then output.MapReduceDone () reducers |> Seq.iter (fun r -> r.Stop ()) c.Stop() ) member m.Stop () = w.Stop () let mapReduce (inputs:seq<'in_key * 'in_value>) (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>) (reduce:'out_key -> seq<'out_value> -> seq<'out_value>) (output:IOutput<'out_key, 'out_value>) M R partitionF = let len = inputs |> Seq.length let M = if len < M then len else M let mappers = Array.init M (fun i -> new Mapper<'in_key, 'in_value, 'out_key, 'out_value>(map, i, partitionF)) let reducers = Array.init R (fun i -> new Reducer<'in_key, 'in_value, 'out_key, 'out_value>(reduce, i, output)) let controller = new Controller<'in_key, 'in_value, 'out_key, 'out_value>(output) mappers |> Array.iter (fun m -> m.Init controller reducers) reducers |> Array.iter (fun r -> r. Init controller ) controller.Init mappers reducers inputs |> Seq.iteri (fun i (inKey, inValue) -> mappers.[i % M].Process inKey inValue) mappers |> Seq.iter (fun m -> m.Done ()) let partitionF = fun key M -> abs(key.GetHashCode()) % M let map = fun (fileName:string) (fileContent:string) -> let l = new List<string * int>() let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|] fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1))) l :> seq<string * int> let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int> let printer () = { new IOutput<string, int> with member o.Reduced key values = printfn "%A %A" key values member o.MapReduceDone () = printfn "All done!!"} let testInput =
["File1", "I was going to the airport when I saw someone crossing"; "File2", "I was going home when I saw you coming toward me"] mapReduce testInput map reduce (printer ()) 2 2 partitionF open System.IO open System.Text let gatherer(step) = let w = new WorkQueue() let data = new List<string * int>() let counter = ref 0 { new IOutput<string, int> with member o.Reduced key values = w.Queue (fun () -> if !counter % step = 0 then printfn "Processed %i words. Now processing %s" !counter key data.Add((key, values |> Seq.hd)) counter := !counter + 1) member o.MapReduceDone () = w.Queue (fun () -> data |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ || (fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value) printfn "All done!!") } let splitBook howManyBlocks fileName = let buffers = Array.init howManyBlocks (fun _ -> new StringBuilder()) fileName |> File.ReadAllLines |> Array.iteri (fun i line -> buffers.[i % (howManyBlocks)].Append(line) |> ignore) buffers let blocks1 = __SOURCE_DIRECTORY__ + "kjv10.txt" |> splitBook 100 let blocks2 = __SOURCE_DIRECTORY__ + "warandpeace.txt" |> splitBook 100 let input = blocks1 |> Array.append blocks2 |> Array.mapi (fun i b -> i.ToString(), b.ToString()) //mapReduce input map reduce (gatherer(1000)) 20 20 partitionF type BookSplitter () = let blocks = new List<string * string>() member b.Split howManyBlocks fileName = let b = fileName |> splitBook howManyBlocks |> Array.mapi (fun i b -> i.ToString(), b.ToString()) blocks.AddRange(b) member b.Blocks () = blocks.ToArray() :> seq<string * string> type WordCounter () = let w = new WorkQueue() let words = new Dictionary<string,int>() let worker(wordCounter:WordCounter, ev:EventWaitHandle) = let w1 = new WorkQueue() { new IOutput<string, int> with member o.Reduced key values = w1.Queue (fun() -> wordCounter.AddWord key (values |> Seq.hd)) member o.MapReduceDone () = w1.Queue(fun () -> ev.Set() |> ignore) } member c.AddWord word count = let exist, value = words.TryGetValue(word) if exist then words.[word] <- value + count else words.Add(word, count) member c.Add fileName = w.Queue (fun () -> let s = new BookSplitter() fileName |> s.Split 100 let ev = new EventWaitHandle(false, EventResetMode.AutoReset) let blocks = s.Blocks () mapReduce blocks map reduce (worker(c, ev)) 20 20 partitionF ev.WaitOne() |> ignore ) member c.Words = w.QueueWithAsync (fun () -> words |> Seq.to_array |> Array.map (fun kv -> kv.Key, kv.Value) ) let wc = new WordCounter() wc.Add (__SOURCE_DIRECTORY__ + "kjv10.txt") wc.Add (__SOURCE_DIRECTORY__ + "warandpeace.txt") let wordsToPrint = async { let! words = wc.Words return words |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ || (fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value)} Async.RunSynchronously wordsToPrint Thread.Sleep(15000) printfn "Closed session"
0 Webmentions

These are webmentions via the IndieWeb and webmention.io.