Concurrent Processes
With an application composed of many concurrent processes, we lose
the convenience offered by the determinism of sequential programs.
For processes sharing the same zone of memory, the result of the
following program cannot be deduced from reading it.
main program |
let x = ref 1 ;; |
process P |
process Q |
x := ! x + 1 ;; |
x := ! x * 2 ;; |
|
At the end of the execution of P and Q, the reference x
can point to 2, 3 or 4, depending on the order of execution of each
process.
This indeterminism applies also to terminations. When the memory
state depends on the execution of each parallel process, an
application can fail to terminate on a particular execution, and
terminate on another. To provide some control over the execution, the
processes must be synchronized.
For processes using distinct memory areas, but communicating between
each other, their interaction depends on the type of
communication. We introduce for the following example two
communication primitives: send which sends a value,
showing the destination, and receive which receives a
value from a process. Let P and Q be two communicating
processes:
process P |
process Q |
let x = ref 1 ;; |
let y = ref 1 ;; |
send(Q,! x); |
y := ! y + 3 ; |
x := ! x * 2 ; |
y := ! y + receive(P); |
send(Q,! x); |
send(P,! y); |
x := ! x + receive(Q); |
y := ! y + receive(P); |
In the case of a transient communication, process Q can miss
the messages of P. We fall back into the non-determinism of the
preceding model.
For an asynchronous communication, the medium of the communication
channel stores the different values that have been transmitted. Only
reception is blocking. Process P can be waiting for Q, even
if the latter has not yet read the two messages from P. However, this
does not prevent it from transmitting.
We can classify concurrent applications into five categories
according to the program units that compose them:
-
unrelated;
- related, but without synchronization;
- related, with mutual exclusion;
- related, with mutual exclusion and communication;
- related, without mutual exclusion, and with synchronous communication.
The difficulty of implementation comes principally from these last
categories. Now we will see how to resolve these difficulties by
using the Objective CAML libraries.
Compilation with Threads
The Objective CAML thread library is divided into five modules, of which the
first four each define an abstract type:
-
module Thread: creation and execution of threads.
(type Thread.t);
- module Mutex: creation, locking and release of mutexes.
(type Mutex.t);
- module Condition: creation of conditions (signals),
waiting and waking up on a condition (type Condition.t);
- module Event: creation of communication channels
(type 'a Event.channel), the values which they carry
(type 'a Event.event), and communication functions.
- module ThreadUnix: redefinitions of I/O functions
of module Unix so that they are not blocking.
This library is not part of the execution library of Objective CAML. Its
use requires the option -custom both for compiling
programs and for constructing a new toplevel by using
the commands:
$ ocamlc -thread -custom threads.cma files.ml -cclib -lthreads
$ ocamlmktop -tread -custom -o threadtop thread.cma -cclib -lthreads
The Threads library is not usable with the native compiler
unless the platform implements threads conforming to the POSIX
10031. Thus we
compile executables by adding the libraries unix.a and
pthread.a:
$ ocamlc -thread -custom threads.cma files.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
$ ocamltop -thread -custom threads.cma files.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
$ ocamlcopt -thread threads.cmxa files.ml -cclib -lthreads \
-cclib -lunix -cclib -lpthread
Module Thread
The Objective CAML Thread module contains the primitives for
creation and management of threads. We will not make an exhaustive
presentation, for instance the operations of file I/O have been
described in the preceding chapter.
A thread is created through a call to:
# Thread.create
;;
- : ('a -> 'b) -> 'a -> Thread.t = <fun>
The first argument, of type 'a -> 'b, corresponds to the
function executed by the created process; the second argument, of type
'a, is the argument required by the executed function; the
result of the call is the descriptor associated with the process. The
process thus created is automatically destroyed when the associated
function terminates.
Knowing its descriptor, we can ask for the execution of a process and
wait for it to finish by using the function
join. Here is a usage example:
# let
f_proc1
()
=
for
i=
0
to
1
0
do
Printf.printf
"(%d)"
i;
flush
stdout
done;
print_newline()
;;
val f_proc1 : unit -> unit = <fun>
# let
t1
=
Thread.create
f_proc1
()
;;
val t1 : Thread.t = <abstr>
# Thread.join
t1
;;
(0)(1)(2)(3)(4)(5)(6)(7)(8)(9)(10)
- : unit = <unknown constructor>
Warning
The result of the execution of a process is not recovered by the
parent process, but lost when the child process terminates.
We can also brutally interrupt the execution of a process of which
we know the descriptor with the function
kill. For instance, we create a process which is immediately interrupted:
# let
n
=
ref
0
;;
val n : int ref = {contents=0}
# let
f_proc1
()
=
while
true
do
incr
n
done
;;
val f_proc1 : unit -> unit = <fun>
# let
go
()
=
n
:=
0
;
let
t1
=
Thread.create
f_proc1
()
in
Thread.kill
t1
;
Printf.printf
"n = %d\n"
!
n
;;
val go : unit -> unit = <fun>
# go
()
;;
n = 0
- : unit = ()
A process can put an end to its own activity by the function:
# Thread.exit
;;
- : unit -> unit = <fun>
It can suspend its activity for a given time by a call to:
# Thread.delay
;;
- : float -> unit = <fun>
The argument stands for the number of seconds to wait.
Let us consider the previous example, and add timing. We create a
first process
t1 of which the associated function
f_proc2 creates in its turn a process t2 which executes
f_proc1, then f_proc2 delays for
d seconds, and then terminates t2. On termination of
t1, we print the contents of n.
# let
f_proc2
d
=
n
:=
0
;
let
t2
=
Thread.create
f_proc1
()
in
Thread.delay
d
;
Thread.kill
t2
;;
val f_proc2 : float -> unit = <fun>
# let
t1
=
Thread.create
f_proc2
0
.
2
5
in
Thread.join
t1
;
Printf.printf
"n = %d\n"
!
n
;;
n = 128827
- : unit = ()