Example: Post Office
We present, to end this chapter, a slightly more complete example of a
concurrent program: modelling a common queue at a number of counters
at a post office.
As always in concurrent programming the problems are posed
metaphorically, but replace the counters of the post office by a
collection of printers and you have the solution to a genuine problem
in computing.
Here the policy of service that we propose; it is well tried and tested,
rather than original: each client takes a number when he
arrives; when a clerk has finished serving a client, he calls for a
number. When his number is called, the client goes to the
corresponding counter.
Organization of development.
We distinguish in our development resources, and agents.
The former are: the number dispenser, the number announcer,
and the windows. The latter are: the clerks and the clients. The
resources are modeled by objects which manage their own mutual exclusion
mechanisms. The agents are modelled by
functions executed by a thread. When an agent wishes to modify or examine
the state of an object, it does not itself have to know about or
manipulate mutexes, which allows a simplified organization for access to sensitive data,
and avoids oversights in the coding of the agents.
The Components
The Dispenser.
The number dispenser contains two fields: a counter and a mutex. The
only method provided by the distributor is the taking of a new number.
# class
dispenser
()
=
object
val
mutable
n
=
0
val
m
=
Mutex.create()
method
take
()
=
let
r
=
Mutex.lock
m
;
n
<-
n+
1
;
n
in
Mutex.unlock
m
;
r
end
;;
class dispenser :
unit ->
object val m : Mutex.t val mutable n : int method take : unit -> int end
The mutex prevents two clients from taking a number at the same time.
Note the way in which we use an intermediate variable (r) to
guarantee that the number calculated in the critical section is the
same as the one return by the method call.
The Announcer.
The announcer contains three fields: an integer (the client number
being called); a mutex and a condition variable. The two methods
are: (wait) which reads the number, and (call),
which modifies it.
# class
announcer
()
=
object
val
mutable
nclient
=
0
val
m
=
Mutex.create()
val
c
=
Condition.create()
method
wait
n
=
Mutex.lock
m;
while
n
>
nclient
do
Condition.wait
c
m
done;
Mutex.unlock
m;
method
call
()
=
let
r
=
Mutex.lock
m
;
nclient
<-
nclient+
1
;
nclient
in
Condition.broadcast
c
;
Mutex.unlock
m
;
r
end
;;
The condition variable is used to put the clients to sleep, waiting
for their number. They are all woken up when the method call is
invoked. Reading or writing access to the called number is protected
by the mutex.
The window.
The window consists of five fields: a fixed window number (variable
ncounter); the number of the client being waited for
(variable nclient); a boolean (variable
available); a mutex, and a condition variable.
It offers eight methods, of which two are private: two simple access
methods (methods get_ncounter and get_nclient): a
group of three methods simulating the waiting period of the clerk
between two clients (private method wait and public methods
await_arrival, await_departure); a group of three
methods simulate the occupation of the window (private method
set_available and methods arrive, depart).
# class
counter
(i:
int)
=
object(self)
val
ncounter
=
i
val
mutable
nclient
=
0
val
mutable
available
=
true
val
m
=
Mutex.create()
val
c
=
Condition.create()
method
get_ncounter
=
ncounter
method
get_nclient
=
nclient
method
private
wait
f
=
Mutex.lock
m
;
while
f
()
do
Condition.wait
c
m
done
;
Mutex.unlock
m
method
wait_arrival
n
=
nclient
<-
n
;
self#wait
(fun
()
->
available)
method
wait_departure
()
=
self#wait
(fun
()
->
not
available)
method
private
set_available
b
=
Mutex.lock
m
;
available
<-
b
;
Condition.signal
c
;
Mutex.unlock
m
method
arrive
()
=
self#set_available
false
method
leave
()
=
self#set_available
true
end
;;
A post office.
We collect these three resources in a record type:
# type
office
=
{
d
:
dispenser
;
a
:
announcer
;
cs
:
counter
array
}
;;
Clients and Clerks
The behaviour of the system as a whole will depend on the three
following parameters:
# let
service_delay
=
1
.
7
;;
# let
arrival_delay
=
1
.
7
;;
# let
counter_delay
=
0
.
5
;;
Each represents the maximum value of the range from which each
effective value will be randomly chosen. The first parameter models
the time taken to serve a client; the second, the delay between the
arrival of clients in the post office; the last, the time it
takes a clerk to call a new client after the last one has left.
The Clerk.
The work of a clerk consists of looping indefinitely over the
following sequence:
-
Call for a number.
- Wait for the arrival of a client holding the called number.
- Wait for the departure of the client occupying his counter.
Adding some output, we get the function:
# let
clerk
((a:
announcer),
(c:
counter))
=
while
true
do
let
n
=
a#call
()
in
Printf.printf
"Counter %d calls %d\n"
c#get_ncounter
n
;
c#wait_arrival
n
;
c#wait_departure
()
;
Thread.delay
(Random.float
counter_delay)
done
;;
val clerk : announcer * counter -> unit = <fun>
The Client.
A client executes the following sequence:
-
Take a waiting number.
- Wait until his number is called.
- Go to the window having called for the number to obtain service.
The only slightly complex activity of the client is to find the counter
where they are expected.
We give, for this, the auxiliary function:
# let
find_counter
n
cs
=
let
i
=
ref
0
in
while
cs.
(!
i)#get_ncounter
<>
n
do
incr
i
done
;
!
i
;;
val find_counter : 'a -> < get_ncounter : 'a; .. > array -> int = <fun>
Adding some output, the principal function of the client is:
# let
client
o
=
let
n
=
o.
d#take()
in
Printf.printf
"Arrival of client %d\n"
n
;
flush
stdout
;
o.
a#wait
n
;
let
ic
=
find_counter
n
o.
cs
in
o.
cs.
(ic)#arrive
()
;
Printf.printf
"Client %d occupies window %d\n"
n
ic
;
flush
stdout
;
Thread.delay
(Random.float
service_delay)
;
o.
cs.
(ic)#leave
()
;
Printf.printf
"Client %d leaves\n"
n
;
flush
stdout
;;
val client : office -> unit = <fun>
The System
The main programme of the application creates a post office and
its clerks (each clerk is a process) then launches a process which
creates an infinite stream of clients (each client is also a process).
# let
main
()
=
let
o
=
{
d
=
new
dispenser();
a
=
new
announcer();
cs
=
(let
cs0
=
Array.create
5
(new
counter
0
)
in
for
i=
0
to
4
do
cs0.
(i)
<-
new
counter
i
done;
cs0)
}
in
for
i=
0
to
4
do
ignore
(Thread.create
clerk
(o.
a,
o.
cs.
(i)))
done
;
let
create_clients
o
=
while
true
do
ignore
(Thread.create
client
o)
;
Thread.delay
(Random.float
arrival_delay)
done
in
ignore
(Thread.create
create_clients
o)
;
Thread.sleep
()
;;
val main : unit -> unit = <fun>
The last instruction puts the process associated with the program to
sleep in order to pass control immediately to the other active
processes of the application.