Synchronization of Processes
In the setting of processes sharing a common zone of memory,
the word ``concurrency'' carries its full meaning: the various processes
involved are compete for access to the unique resource
of the memory2.
To the problem of division of resources, is added
that of the lack of control of the alternation and of the execution
times of the concurrent processes.
The system which manages the collection of processes can at any moment
interrupt a calculation in progress. Thus when two processes
cooperate, they must be able to guarantee the integrity of the
manipulations of certain shared data. For this, a process should be
able to remain owner of these data as long as it has not completed a
calculation or any other operation (for example, an acquisition of
data from a peripheral). To guarantee the exclusivity of access to
the data to a single process, we set up a mechanism called
mutual exclusion.
Critical Section and Mutual Exclusion
The mechanisms of mutual exclusion are implemented with the help of
particular data structures called mutexes. The operations on
mutexes are limited to their creation, their setting, and their
disposal. A mutex is the smallest item of data shared by a collection
of concurrent processes. Its manipulation is always exclusive. To
the notion of exclusivity of manipulation of a mutex is added that of
exclusivity of possession: only the process which has taken a
mutex can free it; if other processes wish to use
the mutex, then they must wait for it to be released by the
process that is holding it.
Mutex Module
Module Mutex is used to create mutexes between processes
related by mutual exclusion on an area of memory. We will
illustrate their use with two small classic examples of concurrency.
The functions of creation, locking, and unlocking of mutexes are:
# Mutex.create
;;
- : unit -> Mutex.t = <fun>
# Mutex.lock
;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock
;;
- : Mutex.t -> unit = <fun>
There exists a variant of mutex locking that is non-blocking:
# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
If the mutex is already locked, the function returns false.
Otherwise, the function locks the mutex and returns
true.
The Dining Philosophers
This little story, due to Dijkstra, illustrates a pure problem of
resource allocation. It goes as follows:
``Five oriental philosophers divide their time between study and
coming to the refectory to eat a bowl of rice. The room devoted to
feeding the philosophers contains nothing but a single round
table on which there is a large dish of rice (always full), five
bowls, and five chopsticks.''
Figure 19.1: The Table of the Dining Philosophers
As we can see in the figure 19.1, a philosopher who
takes his two chopsticks beside his bowl stops his neighbours from
doing the same. When he puts down one of his chopsticks, his
neighbour, famished, can grab it. If needs be, this latter should wait
until the other chopstick is available. Here the chopsticks are the
resources to be allocated.
To simplify things, we suppose that each philosopher habitually comes
to the same place at the table. We model the five chopsticks as five mutexes stored
in a vector b.
# let
b
=
let
b0
=
Array.create
5
(Mutex.create())
in
for
i=
1
to
4
do
b0.
(i)
<-
Mutex.create()
done;
b0
;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]
Eating and meditation are simulated by a suspension of processes.
# let
meditation
=
Thread.delay
and
eating
=
Thread.delay
;;
val meditation : float -> unit = <fun>
val eating : float -> unit = <fun>
We model a philosopher by a function which executes an infinite
sequence of actions from Dijsktra's story. Taking a chopstick
is simulated by the acquisition of a mutex, thus a single philosopher
can hold a given chopstick at a time. We introduce a little time of
reflection between taking and dropping of each of the two
chopsticks while a number of output commands track the activity of the
philosopher.
# let
philosopher
i
=
let
ii
=
(i+
1
)
mod
5
in
while
true
do
meditation
3
.
;
Mutex.lock
b.
(i);
Printf.printf
"Philosopher (%d) takes his left-hand chopstick"
i
;
Printf.printf
" and meditates a little while more\n"
;
meditation
0
.
2
;
Mutex.lock
b.
(ii);
Printf.printf
"Philosopher (%d) takes his right-hand chopstick\n"
i;
eating
0
.
5
;
Mutex.unlock
b.
(i);
Printf.printf
"Philosopher (%d) puts down his left-hand chopstick"
i;
Printf.printf
" and goes back to meditating\n"
;
meditation
0
.
1
5
;
Mutex.unlock
b.
(ii);
Printf.printf
"Philosopher (%d) puts down his right-hand chopstick\n"
i
done
;;
val philosopher : int -> unit = <fun>
We can test this little program by executing:
for
i=
0
to
4
do
ignore
(Thread.create
philosopher
i)
done
;
while
true
do
Thread.delay
5
.
done
;;
We suspend, in the infinite loop while, the main process
in order to increase the chances of the philosopher processes to run.
We use randomly chosen delays in the activity loop with the aim of
creating some disparity in the parallel execution of the processes.
Problems of the naïve solution.
A terrible thing can happen to our philosophers: they all arrive at
the same time and seize the chopstick on their left. In this case we
are in a situation of dead-lock. None of the philosophers
can eat! We are in a situation of starvation.
To avoid this, the philosophers can put down a chopstick if they do not
manage to take the second one. This is highly courteous, but still
allows two philosophers to gang up against a third to stop him from
eating, by not letting go of their chopsticks, except the ones that
their other neighbour has given them. There exist numerous solutions
to this problem. One of them is the object of the exercise on page
??.
Producers and Consumers I
The pair of producers-consumers is a classic example of
concurrent programming. A group of processes, designated the
producers, are in charge of storing data in a queue: a second group,
the consumers, is in charge of removing it. Each intervening party
excludes the others.
We implement this scheme using a queue shared between the producers
and the consumers. To guarantee the proper operation of the system,
the queue is manipulated in mutual exclusion in order to guarantee the
integrity of the operations of addition and removal.
f is the shared queue, and m is the mutex.
# let
f
=
Queue.create
()
and
m
=
Mutex.create
()
;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>
We divide the activity of a producer into two parts: creating a
product (function produce) and storing a product (fonction
store). Only the operation of storage needs the mutex.
# let
produce
i
p
d
=
incr
p
;
Thread.delay
d
;
Printf.printf
"Producer (%d) has produced %d\n"
i
!
p
;
flush
stdout
;;
val produce : int -> int ref -> float -> unit = <fun>
# let
store
i
p
=
Mutex.lock
m
;
Queue.add
(i,!
p)
f
;
Printf.printf
"Producer (%d) has added its %dth product\n"
i
!
p
;
flush
stdout
;
Mutex.unlock
m
;;
val store : int -> int ref -> unit = <fun>
The code of the producer is an endless loop of creation and storage.
We introduce a random delay at the end of each iteration in order to desynchronize
the execution.
# let
producer
i
=
let
p
=
ref
0
and
d
=
Random.float
2
.
in
while
true
do
produce
i
p
d
;
store
i
p
;
Thread.delay
(Random.float
2
.
5
)
done
;;
val producer : int -> unit = <fun>
The only operation of the consumer is the retrieval of an element of
the queue, taking care that the product is actually there.
# let
consumer
i
=
while
true
do
Mutex.lock
m
;
(
try
let
ip,
p
=
Queue.take
f
in
Printf.printf
"The consumer(%d) "
i
;
Printf.printf
"has taken product (%d,%d)\n"
ip
p
;
flush
stdout
;
with
Queue.
Empty
->
Printf.printf
"The consumer(%d) "
i
;
print_string
"has returned empty-handed\n"
)
;
Mutex.unlock
m
;
Thread.delay
(Random.float
2
.
5
)
done
;;
val consumer : int -> unit = <fun>
The following test program creates four producers and four consumers.
for
i
=
0
to
3
do
ignore
(Thread.create
producer
i);
ignore
(Thread.create
consumer
i)
done
;
while
true
do
Thread.delay
5
.
done
;;
Waiting and Synchronization
The relation of mutual exclusion is not ``fine'' enough to describe
synchronization between processes. It is not rare that the work of a
process depends on the completion of an action by another process,
thus modifying a certain condition. It is therefore desirable that
the processes should be able to communicate the fact that this
condition might have changed, indicating to the waiting processes to
test it again. The different processes are thus in a relation of
mutual exclusion with communication.
In the preceding example, a consumer, rather than returning
empty-handed, could wait until a producer came to resupply the stock.
This last could signal to the waiting consumer that there is something
to take. The model of waiting on a condition to take a mutex is known
as semaphore.
Semaphores.
A semaphore is an integral variable s which can only take non negative
values. Once s is initialised, the only operations
allowed are: wait(s) and signal(s), written P(s) and V(s),
respectively. They are defined thus, s corresponding to the number
of resources of a given type.
-
wait(s): if s > 0 then s := s -1, otherwise the process, having called wait(s), is suspended.
- signal(s): if a process has been suspended after a prior invocation of wait(s), then wake it up, otherwise s := s + 1.
A semaphore which only takes the values 0 or 1 is called a
binary semaphore.
Condition Module
The functions of the module Condition implement the
primitives of putting to sleep and waking up processes on a signal. A
signal, in this case, is a variable shared by a collection of
processes. Its type is abstract and the manipulation functions
are:
-
create
- : unit -> Condition.t which creates a new signal.
- signal
- : Condition.t -> unit which wakes up one of the processes waiting on a signal.
- broadcast
- : Condition.t -> unit which wakes up all of the processes waiting on a signal.
- wait
- : Condition.t -> Mutex.t -> unit
which suspends the calling process on the signal passed as the first
argument. The second argument is a mutex used to protect the
manipulation of the signal. It is released, and then reset at each
execution of the function.
Producers and Consumers (2)
We revisit the example of producers and consumeres by using the
mechanism of condition variables to put to sleep a consumer arriving
when the storehouse is empty.
To implement synchronization between waiting consumers and production,
we declare:
# let
c
=
Condition.create
()
;;
val c : Condition.t = <abstr>
We modify the storage function of the producer by adding to it the
sending of a signal:
# let
store2
i
p
=
Mutex.lock
m
;
Queue.add
(i,!
p)
f
;
Printf.printf
"Producer (%d) has added its %dth product\n"
i
!
p
;
flush
stdout
;
Condition.signal
c
;
Mutex.unlock
m
;;
val store2 : int -> int ref -> unit = <fun>
# let
producer2
i
=
let
p
=
ref
0
in
let
d
=
Random.float
2
.
in
while
true
do
produce
i
p
d;
store2
i
p;
Thread.delay
(Random.float
2
.
5
)
done
;;
val producer2 : int -> unit = <fun>
The activity of the consumer takes place in two phases: waiting until
a product is available, then taking the product. The mutex is taken
when the wait is finished and it is released when the consumer has
taken its product. The wait takes place on the variable c.
# let
wait2
i
=
Mutex.lock
m
;
while
Queue.length
f
=
0
do
Printf.printf
"Consumer (%d) is waiting\n"
i
;
Condition.wait
c
m
done
;;
val wait2 : int -> unit = <fun>
# let
take2
i
=
let
ip,
p
=
Queue.take
f
in
Printf.printf
"Consumer (%d) "
i
;
Printf.printf
"takes product (%d, %d)\n"
ip
p
;
flush
stdout
;
Mutex.unlock
m
;;
val take2 : int -> unit = <fun>
# let
consumer2
i
=
while
true
do
wait2
i;
take2
i;
Thread.delay
(Random.float
2
.
5
)
done
;;
val consumer2 : int -> unit = <fun>
We note that it is no longer necessary, once a consumer
has begun to wait in the queue, to check
for the existence of a product.
Since the end of its wait
corresponds to the locking of the mutex, it does not run the risk of
having the new product stolen before it takes it.
Readers and Writers
Here is another classic example of concurrent processes in which the
agents do not have the same behaviour with respect to the shared data.
A writer and some readers operate on some shared data. The action of
the first may cause the data to be momentarily inconsistent, while the
second group only have a passive action. The difficulty arises from
the fact that we do not wish to prohibit multiple readers from
examining the data simultaneously. One solution to this problem is
to keep a counter of the number of readers in the processes of
accessing the data. Writing is not allowed except if the number of
readers is 0.
The data is symbolized by the integer data which takes the
value 0 or 1. The value 0 indicates that
the data is ready for reading:
# let
data
=
ref
0
;;
val data : int ref = {contents=0}
Operations on the counter n are protected by the mutex m:
# let
n
=
ref
0
;;
val n : int ref = {contents=0}
# let
m
=
Mutex.create
()
;;
val m : Mutex.t = <abstr>
# let
cpt_incr
()
=
Mutex.lock
m
;
incr
n
;
Mutex.unlock
m
;;
val cpt_incr : unit -> unit = <fun>
# let
cpt_decr
()
=
Mutex.lock
m
;
decr
n
;
Mutex.unlock
m
;;
val cpt_decr : unit -> unit = <fun>
# let
cpt_signal
()
=
Mutex.lock
m
;
if
!
n=
0
then
Condition.signal
c
;
Mutex.unlock
m
;;
val cpt_signal : unit -> unit = <fun>
The readers update the counter and emit the signal c when no
more readers are present. This is how they indicate to the writer
that it may come into action.
# let
c
=
Condition.create
()
;;
val c : Condition.t = <abstr>
# let
read
i
=
cpt_incr
()
;
Printf.printf
"Reader (%d) read (data=%d)\n"
i
!
data
;
Thread.delay
(Random.float
1
.
5
)
;
Printf.printf
"Reader (%d) has finished reading\n"
i
;
cpt_decr
()
;
cpt_signal
()
;;
val read : int -> unit = <fun>
# let
reader
i
=
while
true
do
read
i
;
Thread.delay
(Random.float
1
.
5
)
done
;;
val reader : int -> unit = <fun>
The writer needs to block the counter to prevent the readers from
accessing the shared data. But it can only do so if the counter is 0,
otherwise it waits for the signal indicating that this is the
case.
# let
write
()
=
Mutex.lock
m
;
while
!
n<>
0
do
Condition.wait
c
m
done
;
print_string
"The writer is writing\n"
;
flush
stdout
;
data
:=
1
;
Thread.delay
(Random.float
1
.
)
;
data
:=
0
;
Mutex.unlock
m
;;
val write : unit -> unit = <fun>
# let
writer
()
=
while
true
do
write
()
;
Thread.delay
(Random.float
1
.
5
)
done
;;
val writer : unit -> unit = <fun>
We create a reader and six writers to test these functions.
ignore
(Thread.create
writer
());
for
i=
0
to
5
do
ignore(Thread.create
reader
i)
done;
while
true
do
Thread.delay
5
.
done
;;
This solution guarantees that the writer and the readers cannot have
access to the data at the same time. On the contrary, nothing
guarantees that the writer could ever ``fufill his officé', there we
are confronted again with a case of starvation.