Lesson 3: Dynamic reconfiguration

In this lesson, we show how we can add and remove processes to a system during execution. To illustrate that, we will use a simple graphical application which displays a moving point.

We define the state of a moving point by a record containing the color of the point, and two signals that represent respectively the flows of positions and velocities of the point.

type state =
    { color: Graphics.color;
      pos: (float * float, float * float) event;
      vel: (float * float, float * float) event; }

At each instant, the position and velocity of a point are represented by a pair of floats.

We program a way to observe the moving points. To do that we define a global signal to_draw on which points will send their states.

signal to_draw;;

Then we define a process called window which opens the graphical window and displays the points.

let process window =
  Graphics.open_graph "";
    await to_draw (all) in
      (fun state ->
        let x, y = last ?state.pos in
        let x_int = (truncate x) mod (Graphics.size_x()) in
        let y_int = truncate y in
        Graphics.set_color state.color;
        Graphics.fill_circle x_int y_int 5)

#run window;;

To create a value of type state, we need some auxiliary functions.

We define the function +: to add a pair of floats.

let (+:) (x1, y1) (x2, y2) = (x1 +. x2, y1 +. y2)

We define a function color_of_int which converts an integer into a value of type Graphics.color.

let color_of_int = function
  | 0 -> Graphics.black
  | 1 -> Graphics.magenta
  | 2 -> Graphics.green
  | 3 -> Graphics.red
  | 4 -> Graphics.blue
  | _ -> Graphics.black

We can now define a function which creates a new state.

let new_state () =
  signal pos default (0., 0.) gather (+:) in
  signal vel default (0., 0.) gather (+:) in
  emit pos (0., float (Graphics.size_y () / 2));
  { color = (color_of_int (Random.int 5));
    pos = pos;
    vel = vel; }

The flow of positions is initialized on the left side of the graphical window by an emission on the pos signal.

To compute the position of the point, we integrate the velocity.

let process compute_pos state =
    let p = last ?state.pos +: last ?state.vel in
    emit state.pos p;

So, to move a point from left to right, we define a process which maintains the velocity of a point at each instant.

let process left_right state =
  loop emit state.vel (2.0, 0.); pause end

To observe the position of a point, we define a process draw which emits the state of a point at each instant.

let process draw state =
    emit to_draw state; pause

The behavior of a moving point is the parallel composition of the three preceding processes.

let process moving_point state =
   run (compute_pos state)
   run (left_right state)
   run (draw state)

#run moving_point (new_state ());;

The created process cannot be removed since it never terminates. In the next step, we will see how to implement a simple process manager that can stop the execution of a process in a modular way.

We first define a function generating a unique identifier for each process.

let gen_id =
  let cpt = ref 0 in fun () -> incr cpt; !cpt

We define a global signal to_kill on which we will send the id of the processes to kill.

signal to_kill;;

We define a process killable such that killable p associates an id to the process p given as argument and executes it. When the id associated to p is emitted on to_kill, the execution of p is stopped.

let process killable p =
  let id = gen_id () in
  print_endline ("["^(string_of_int id)^"]");
  do run p
  until to_kill(ids) when List.mem id ids done

The identifier id is generated then the process p is executed. If during the execution, the signal to_kill is emitted, the variable ids will be assigned the list of processes to kill, thanks to the do/until/when construct. If id belongs to this list (List.mem id ids) then the execution is stopped otherwise execution continues.

We can test our new combinator with a moving point.

#run killable (moving_point (new_state ()));;

Observe that the id [1] is printed into the terminal. Thefore, we can kill this process by sending its id on the signal to_kill.

emit to_kill 1;;

Let's now see how we can change the behavior of a process. We define a process replace which takes as argument an initial behavior p_init, a state state and a signal new_behavior on which new behavior to replace the current behavior can be sent.

let rec process replace p_init state new_behavior =
    run (p_init state)
  until new_behavior(p) ->
    run (replace p state new_behavior)

The process p_init parameterized by state is executed under the control of the signal new_behavior. When new_behavior is emitted, p_init is stopped and we receive the process p carried on this signal. The new process p is executed through the recursive call to replace, which continues to allow changes to the behavior of the process.

To use this process, we need a signal on which we can send and receive a process. To handle multiple emissions during the same instant, we build the parallel composition of the sent processes.

signal new_behavior
  default (fun state -> process ())
  gather (fun p q state ->
            process (run (p state) || run (q state)))

We can now execute a moving point whose behavior can be replaced dynamically.

#run replace moving_point (new_state ()) new_behavior;;

Now, to replace the point moving from left to right, we define the behavior of a point moving up and down.

let process up_down state =
  for i = 1 to 50 do emit state.vel (0., 2.); pause done;
    for i = 1 to 100 do emit state.vel (0., - 2.); pause done;
    for i = 1 to 100 do emit state.vel (0., 2.); pause done

let process moving_point' state =
   run (up_down state) ||
   run (compute_pos state) || run (draw state)

To reconfigure the previous point, we need only send this moving_point' process on the signal new_behavior.

emit new_behavior moving_point';;

We can restore the previous behavior as follows.

emit new_behavior moving_point;;

Another useful reconfiguration combinator is one that adds a new behavior to a running process. To do that, we define a process extend that executes a process p_init and awaits new processes to execute on a signal add_behavior. The initial process p_init and the added processes will share a common state state.

let rec process extend p_init state add_behavior =
  run (p_init state)
  await add_behavior (p) in
  run (extend p state add_behavior)

In this process, p_init is executed and in parallel a process p is awaited on the signal add_behavior. When p is received, it is executed through a recursive call to extend so that it is still possible to add new behaviors.

Similarly to new_behavior, we define a signal add_behavior on which the adding requests will be sent.

signal add_behavior
  default (fun state -> process ())
  gather (fun p q state ->
            process (run (p state) || run (q state)))

We can run an extensible moving point,

#run extend moving_point (new_state ()) add_behavior ;;

and add an up/down behavior to this point moving left to right.

emit add_behavior up_down ;;

Of course, we can combine the previous operators. For example, we can define a process that can be extended and killed.

   (extend moving_point (new_state ()) add_behavior) ;;

Then we can reconfigure this process

emit add_behavior up_down ;;

and kill it.

emit to_kill 2 ;;

We can also combine the operators in another way. First we run an extensible process.

#run extend moving_point (new_state ()) add_behavior ;;

and then we add a behavior which is killable.

emit add_behavior
  (fun state -> process (run (killable (up_down state)))) ;;

You can see that the behavior of two moving points have been modified because two processes are waiting on the signal add_behavior. The two processes execute a new killable behavior in parallel to their previous behavior and thus the id of two killable processes are printed into the terminal. We can now kill these processes.

emit to_kill 3 ;;

emit to_kill 4 ;;

Note that the two moving points that were modified still exist because we only killed the added behaviors.

To avoid that multiple processes share the same reconfiguration signal, we can associate an id to each extensible process and dispatch the requests.

The adding requests will be sent on the signal to_add with the id of the process to extend.

signal to_add ;;

Each extensible process has its own add_behavior signal and filters the to_add signal to extract only those requests addressed to it.

let process extensible p_init state =
  let id = gen_id () in
  print_endline ("{"^(string_of_int id)^"}");
  signal add_behavior default (fun state -> process ())
     (fun p q state -> process (run (p state) || run (q state)))
  run (extend p_init state add_behavior)
    await to_add(reqs) in
     (fun (x, p) -> if x = id then emit add_behavior p)

We can test our new combinator as follows.

#run extensible moving_point (new_state ()) ;;

emit to_add (5, up_down) ;;

A small library of process management can be found in the standard library: rml_process_manager.rmli. For more information about dynamic reconfiguration in ReactiveML you can read this paper.