Concurrency With Multithreading
Note
To follow the materials mentioned in this section, change to the layer-cake-demos/multithreading
folder.
A function is needed for the following demonstrations. The texture()
function accepts size parameters and returns a
2D table of the specified dimensions, filled with random floating-point values. Execution of the code consumes computing
resources such as memory and CPU cycles.
# test_function_1.py
import random
random.seed()
def texture(x: int=8, y: int=8) -> list[list[float]]:
table = []
for r in range(y):
row = [None] * x
table.append(row)
for c in range(x):
row[c] = random.random()
return table
Standard Python hints describe the nature of the values passed to the function (int
) and the nature of what the function
produces (list[list[float]]
).
$ python3
...
>>> from test_function_1 import texture
>>> texture(x=2, y=2)
[[0.7612548315750599, 0.23818407873566672], [0.3034216434184227, 0.06614148124376695]]
>>>
Calling the function produces a list of lists matching the given dimensions.
Providing Network Access To The Function
The demonstration function needs to be combined with some network plumbing and management of request and response messages. An implementation is presented below. It introduces an asynchronous approach to software operation, also known as event-driven or message-driven software. The more pertinent aspects of this approach are covered in detail in following sections;
# test_server_1.py
import layer_cake as lc
from test_api import Xy, table_type
from test_function_1 import texture
DEFAULT_ADDRESS = lc.HostPort('127.0.0.1', 5050)
SERVER_API = [Xy,]
def server(self, server_address: lc.HostPort=None):
server_address = server_address or DEFAULT_ADDRESS
lc.listen(self, server_address, http_server=SERVER_API)
m = self.input()
if not isinstance(m, lc.Listening):
return m
while True:
m = self.input()
if isinstance(m, Xy):
pass
elif isinstance(m, lc.Faulted):
return m
elif isinstance(m, lc.Stop):
return lc.Aborted()
else:
continue
response = texture(x=m.x, y=m.y)
self.send(lc.cast_to(response, table_type), self.return_address)
lc.bind(server)
if __name__ == '__main__':
lc.create(server)
Execution of the server will produce output similar to that shown below (logs have been reduced to fit);
$ python3 test_server_1.py --debug-level=DEBUG
00:04:11.423 + <0000000e>ListenConnect - Created by <00000001>
00:04:11.424 < <0000000e>ListenConnect - Received Start from <00000001>
00:04:11.424 > <0000000e>ListenConnect - Sent SocketChannel to <00000001>
00:04:11.424 + <0000000f>ObjectDirectory[INITIAL] - Created by <00000001>
00:04:11.424 < <0000000f>ObjectDirectory[INITIAL] - Received Start ...
00:04:11.424 + <00000010>ObjectCollector[INITIAL] - Created by <00000001>
00:04:11.424 < <00000010>ObjectCollector[INITIAL] - Received Start ...
00:04:11.424 + <00000011>start_vector - Created by <00000001>
00:04:11.424 + <00000012>server - Created by <00000011>
00:04:11.424 ~ <0000000e>ListenConnect - Listening on "127.0.0.1:5050"
00:04:11.424 > <0000000e>ListenConnect - Sent Listening to <00000012>
00:04:11.424 < <00000012>server - Received Listening from <0000000e>
Further information on logging output can be found here.
Use curl
(or some other HTTP client) to make a call to the network service;
$ curl -s 'http://127.0.0.1:5050/Xy?x=2&y=2'
{
"value": [
"vector<vector<float8>>",
[
[
0.7121297344671714,
0.2617093660768349
],
[
0.44326145558200136,
0.1843574524335293
]
],
[]
]
}
The 2D table of floats can be seen together with supporting information. Logs associated with the processing of the request will look like;
00:08:39.230 + <00000013>SocketProxy[INITIAL] - Created by <0000000e>
00:08:39.230 ~ <0000000e>ListenConnect - Accepted "127.0.0.1:56586" ...
00:08:39.230 > <0000000e>ListenConnect - Forward Accepted to <00000012> ...
00:08:39.231 > <0000000e>ListenConnect - Forward Xy to <00000012> ...
00:08:39.231 < <00000013>SocketProxy[INITIAL] - Received Start ...
00:08:39.231 < <00000012>server - Received Accepted from <00000013>
00:08:39.231 < <00000012>server - Received Xy from <00000013>
00:08:39.231 < <00000013>SocketProxy[NORMAL] - Received list_list_float ...
00:08:39.233 > <0000000e>ListenConnect - Sent Stop to <00000013>
00:08:39.233 > <0000000e>ListenConnect - Forward Closed to <00000012> ...
00:08:39.234 < <00000013>SocketProxy[NORMAL] - Received Stop ...
00:08:39.234 < <00000012>server - Received Closed from <00000013>
00:08:39.234 X <00000013>SocketProxy[NORMAL] - Destroyed
The connection from curl
is accepted, and is immediately followed by the inbound request. The server()
responds
with a table and the connection is terminated.
Note
A RESTful style of interfacing is supported through
the ReForm
and ResourceDispatch
classes. Refer to the test_server_resource.py
module, in the current folder, for an example of the related resource-based dispatching. Also,
this reference information
might be useful.
Declaring An API
Details about the API are defined separately in the test_api.py
file. This file remains the same for all the implementations;
# test_api.py
import layer_cake as lc
class Xy(object):
def __init__(self, x: int=1, y: int=1):
self.x = x
self.y = y
lc.bind(Xy)
table_type = lc.def_type(list[list[float]])
To send a message (i.e. a named collection of typed members) in layer-cake it needs to be defined as a class. Type
hints must be used to describe the arguments passed to the __init__()
method. Lastly, the class is registered with
the library using bind()
.
Registration prepares information needed for the conversion of the HTTP representation - /Xy?x=2&y=3
- into an
instance of the Xy
class during network messaging. It also prepares for logging.
To send anything other than a registered class, the type must be registered using the def_type()
function. This
produces a portable object that is used to mark the relevant Python data when required, e.g. cast_to()
. As far
as the Python type system is concerned the response variable is a list
. Within the layer-cake machinery it is
a list[list[float]]
and that deeper detail needs to travel with the data when it passes between threads and processes.
Further information about registration of types can be found here. There is also a full reference available.
A Brief Outline
An execution trace for the server, together with commentary appears below. General information about the operation of asynchronous software can be found here;
* lc.create(server)
* server(self, server_address)
* lc.listen(self, server_address, http_server)
* m = self.input()
* isinstance(m, Listening)
* while True
* m = self.input()
* isinstance(m, Xy)
* response = texture(m.x, m.y)
* c = lc.cast_to(response, self.returned_type)
* self.send(c, self.return_address)
* m = self.input()
* ..
The call to create()
causes the initiation of a platform thread and the new thread is directed to call the server()
function.
Alongside the thread, a special object is created by the library and passed as the first argument. This provides access to asynchronous operations
such as send()
. It also contains the unique identity of the server()
instance. Technically, there can be many running instances
of a function such as server()
, each with its own dedicated thread and self
object.
A call to listen()
arranges for the setup of a TCP listen, at the given address. The library directs all events associated with the
network port to the given identity (i.e. self
), such as;
Listening
- the listen operation was successful,NotListening
- the listen operation failed,Accepted
- a client has successfully connected,Closed
- an established connection has shut down,Xy
- a request was sent by a connected client.
The server checks for a successful listen()
and then enters an endless loop that waits for messages and responds according to the type
of the message received.
In the case of the Xy
message this involves a call to texture()
and sending the result back to the identity that sent the request,
i.e. self.return_address
.
Sending and receiving of messages across a network is fully automated - activities such as serialization, marshaling and block I/O all occur
discreetly. The curl
client forms the proper HTTP representation of an Xy
message and the server()
function receives a fully
resolved instance of the Xy
class, using the input()
method. When the server()
responds there is a reversal of the
process, eventually resulting in a JSON encoding of the table within the body of an HTTP response message.
Sending a Stop
message is the standard mechanism for termination of asynchronous activity. In this context the message is generated
by the asynchronous runtime in response to a control-c. The standard response is to terminate with the Aborted
message.
A Faulted
message indicates a runtime problem. NotListening
is an example of a fault message, i.e. the NotListening
class derives from the Faulted
class. Testing with the isinstance(m, lc.Faulted)
call catches all derived messages. Terminating a
process with a fault produces specific handling in the asynchronous runtime - it is the means by which child processes deliver bad news to the
parent process. In the context of command-line operation, a diagnostic message is printed on stderr
. Starting multiple copies of test_server_1.py
will elicit this behaviour;
$ python3 test_server_1.py
test_server_1.py: cannot listen at "127.0.0.1:5050" ([Errno 98] Address already in use)
Messages are sent to an address passed to send()
. These addresses are layer-cake addresses, i.e. not network addresses or
machine pointers. At that point where a message is received, the address of the sending party is always available as self.return_address
.
This is how the response table is routed back to the proper HTTP client.
Message-driven software inevitably includes message dispatching code;
if isinstance(m, Xy):
pass
elif isinstance(m, lc.Faulted):
return m
elif isinstance(m, lc.Stop):
return lc.Aborted()
else:
continue
Which can become cumbersome especially when mixed with loops. Use of match-case
might produce clearer code. Layer-cake
includes the concept of machines, which tackles the issue of dispatching head on. A guide to the definition of machines can be
found here. A machine approach has also been adopted for the creation of a test client.
Perhaps the most important aspect to this initial implementation is the fundamentally asynchronous approach to the processing of an HTTP
request message. HTTP clients are restricted to a synchronous, request-response interaction with HTTP servers. There is no such constraint
on the internal workings of the server()
and it is in this area that effective concurrency can be delivered. Layer-cake can’t help
individual clients with the blocking nature of their HTTP requests but it can deliver true concurrency within the server activity, as it
juggles the requests from multiple connected clients.
Concurrency Using Multithreading
The first iteration of the server supports a single execution of the texture()
function at any one time. There can be multiple
connected clients but the associated requests are queued internally by the asynchronous framework and delivered to server()
one
at a time, through the input()
function. Until the load is heavy enough to overflow the internal queues, this is not a problem.
However, the average response time - that time between submitting an Xy
request and receiving the response table - is probably
sub-optimal. A few minor changes arrange for full concurrency;
# test_server_2.py
import layer_cake as lc
from test_api import Xy, table_type
from test_function_2 import texture
DEFAULT_ADDRESS = lc.HostPort('127.0.0.1', 5050)
SERVER_API = [Xy,]
def server(self, server_address: lc.HostPort=None):
server_address = server_address or DEFAULT_ADDRESS
# Open a network port for HTTP clients, e.g. curl.
lc.listen(self, server_address, http_server=SERVER_API)
m = self.input()
if not isinstance(m, lc.Listening):
return m
# Run a live network service.
while True:
m = self.input()
if isinstance(m, Xy):
pass
elif isinstance(m, lc.Returned):
d = self.debrief()
if isinstance(d, lc.OnReturned):
d(self, m)
continue
elif isinstance(m, lc.Faulted):
return m
elif isinstance(m, lc.Stop):
return lc.Aborted()
else:
continue
# Callback for on_return.
def respond(self, response, args):
self.send(lc.cast_to(response, self.returned_type), args.return_address)
a = self.create(texture, x=m.x, y=m.y)
self.on_return(a, respond, return_address=self.return_address)
lc.bind(server)
if __name__ == '__main__':
lc.create(server)
The direct call to texture()
has been replaced with create()
. The asynchronous framework initiates a platform thread
and causes the new thread to call texture()
. This is similar to what occurs during startup of the server, i.e. create()
.
An address for the new instance is returned in the a
variable and that is used to register a callback to the respond()
function. When the texture()
call completes the framework generates a Returned
message and routes it back to the
server. Processing of the Returned
message ultimately results in the deferred call to respond()
, passing the response
and the collection of saved arguments passed to on_return()
, e.g. return_address=self.return_address
. This is critical to
ensuring that each response goes back to the proper client.
The result of these changes is that every execution of texture()
is discreetly provided with its own dedicated thread. There
can now be multiple instances of texture()
running inside the server at any one time. It is also entirely possible for instances
of texture()
to terminate “out of sequence”, e.g. where the request for a large table of random floats is followed by a request
for a small table and the latter returns before the former.
After creating a callback using on_return()
the server()
thread is immediately available for processing of the next
message, preserving overall responsiveness. An overview of how callbacks fit into general asynchronous operation, is
available here.
A minor change was also required in test_function_2.py
;
# test_function_2.py
import random
import layer_cake as lc
random.seed()
def texture(self, x: int=8, y: int=8) -> list[list[float]]:
table = []
for r in range(y):
row = [None] * x
table.append(row)
for c in range(x):
row[c] = random.random()
return table
lc.bind(texture)
The texture()
function is now being registered and a self
argument has been added. This ensures that the function call signature
matches the expectations of a create()
, even though the argument is unused in this case. Registration of a function effectively creates
a “thread entry-point” for that function.
Delegating Requests To A Worker
The second iteration of the server looks like a real improvement. However, to an experienced eye there are still problems. It is a convenient assumption that there is an endless supply of thread resources and that adding the next thread to the workload of the CPU, is as beneficial as it was to add the first. Of course, neither of these things is true.
It’s also a consideration that the platform operation to initiate a thread consumes CPU time and avoiding the cost of constantly creating and destroying platform threads is probably a good idea.
A thread is needed that accepts multiple Xy
requests over its lifetime;
# test_worker_3.py
import layer_cake as lc
from test_api import Xy, table_type
from test_function_3 import texture
def worker(self):
while True:
m = self.input()
if isinstance(m, Xy):
pass
elif isinstance(m, lc.Faulted):
return m
elif isinstance(m, lc.Stop):
return lc.Aborted()
else:
continue
table = texture(x=m.x, y=m.y)
self.send(lc.cast_to(table, table_type), self.return_address)
lc.bind(worker)
To benefit from this approach the server needs to look like;
# test_server_3.py
import layer_cake as lc
from test_api import Xy, table_type
from test_worker_3 import worker
DEFAULT_ADDRESS = lc.HostPort('127.0.0.1', 5050)
SERVER_API = [Xy,]
def server(self, server_address: lc.HostPort=None):
server_address = server_address or DEFAULT_ADDRESS
# Open a network port for HTTP clients, e.g. curl.
lc.listen(self, server_address, http_server=SERVER_API)
m = self.input()
if not isinstance(m, lc.Listening):
return m
# Start a request processor in a separate thread.
worker_address = self.create(worker)
# Run a live network service.
while True:
m = self.input()
if isinstance(m, Xy):
pass
elif isinstance(m, lc.Returned):
d = self.debrief()
if isinstance(d, lc.OnReturned):
d(self, m)
continue
elif isinstance(m, lc.Faulted):
return m
elif isinstance(m, lc.Stop):
return lc.Aborted()
else:
continue
# Callback for on_return.
def respond(self, response, args):
self.send(lc.cast_to(response, self.returned_type), args.return_address)
a = self.create(lc.GetResponse, m, worker_address)
self.on_return(a, respond, return_address=self.return_address)
lc.bind(server)
if __name__ == '__main__':
lc.create(server)
There are two points of interest;
worker_address = self.create(worker)
a = self.create(lc.GetResponse, m, worker_address)
An instance of worker()
is created during startup and its address saved as worker_address
. Rather than sending the
requests directly to that address there is now a create()
, passing GetResponse
as a parameter. This special
library facility forwards the given message to the specified address and waits for a response. This somewhat convoluted
approach allows for continued use of the callback mechanism. Without the presence of GetResponse
the worker would
send the response directly to the server and there would be no Returned
message to drive the callback machinery.
Note
Developers familar with event-driven software will recognise the role that GetResponse
plays
in this scenario. It is the equivalent of an entry in a pending request table. Within the layer-cake
framework there is no need to allocate ids, match responses with requests and update the table. This happens
as a natural by-product of delegating to an independent, asynchronous object. Further information about
management of complex request scenarios can be found here.
On receiving a message the GetResponse
facility terminates, passing the message back to the server inside a Returned
message. The standard processing of callbacks occurs resulting in the call to respond()
and a send()
of the table back
to the proper client.
The per-request creation of platform threads (i.e. instances of texture()
) has been replaced with one-off creation of
a worker()
.
Distributing Load Across Multiple Workers
Adoption of worker()
has reduced interactions with the platform but has also resulted in the return of a familiar problem. All
requests must pass through the single thread that has been assigned to the instance of worker()
. Concurrency has been lost.
A pool of workers is needed along with the code to distribute the requests across the pool. Adding this capability to the previous implementation is trivial;
# test_server_4.py
import layer_cake as lc
from test_api import Xy, table_type
from test_worker_4 import worker
DEFAULT_ADDRESS = lc.HostPort('127.0.0.1', 5050)
SERVER_API = [Xy,]
def server(self, server_address: lc.HostPort=None):
server_address = server_address or DEFAULT_ADDRESS
# Open a network port for HTTP clients, e.g. curl.
lc.listen(self, server_address, http_server=SERVER_API)
m = self.input()
if not isinstance(m, lc.Listening):
return m
# Start a collection of workers.
worker_spool = self.create(lc.ObjectSpool, worker)
# Run a live network service.
while True:
m = self.input()
if isinstance(m, Xy):
pass
elif isinstance(m, lc.Returned):
d = self.debrief()
if isinstance(d, lc.OnReturned):
d(self, m)
continue
elif isinstance(m, lc.Faulted):
return m
elif isinstance(m, lc.Stop):
return lc.Aborted()
else:
continue
# Callback for on_return.
def respond(self, response, args):
self.send(lc.cast_to(response, self.returned_type), args.return_address)
a = self.create(lc.GetResponse, m, worker_spool)
self.on_return(a, respond, return_address=self.return_address)
lc.bind(server)
if __name__ == '__main__':
lc.create(server)
Rather than creating an instance of a worker()
there is now the creation of an ObjectSpool
. This library facility uses
the remaining arguments to create a collection of worker()
instances. The number of workers can be specified as a parameter,
e.g. object_count=16
. The default is 8.
worker_spool = self.create(lc.ObjectSpool, worker)
The worker_spool
variable is used in exactly the same manner as the worker_address
was used. Internally the requests are distributed
across the workers. Running the latest server looks like;
$ python3 test_server_4.py -dl=DEBUG
00:39:57.196 + <0000000e>ListenConnect - Created by <00000001>
00:39:57.196 < <0000000e>ListenConnect - Received Start from <00000001>
00:39:57.196 > <0000000e>ListenConnect - Sent SocketChannel to <00000001>
00:39:57.196 + <0000000f>ObjectDirectory[INITIAL] - Created by <00000001>
...
00:39:57.197 < <00000012>server - Received Listening from <0000000e>
00:39:57.197 + <00000013>ObjectSpool[INITIAL] - Created by <00000012>
00:39:57.197 < <00000013>ObjectSpool[INITIAL] - Received Start ...
00:39:57.197 + <00000014>worker - Created by <00000013>
00:39:57.197 + <00000015>worker - Created by <00000013>
00:39:57.197 + <00000016>worker - Created by <00000013>
00:39:57.197 + <00000017>worker - Created by <00000013>
00:39:57.198 + <00000018>worker - Created by <00000013>
00:39:57.198 + <00000019>worker - Created by <00000013>
00:39:57.198 + <0000001a>worker - Created by <00000013>
00:39:57.198 + <0000001b>worker - Created by <00000013>
Logs show the spool being populated with multiple instances of the worker()
. After multiple requests using the curl
client, the associated
logs look like;
00:40:03.529 < <00000012>server - Received Xy from <0000001c>
...
00:40:03.529 > <0000001d>GetResponse - Sent Xy to <00000013>
00:40:03.529 < <00000013>ObjectSpool[SPOOLING] - Received Xy ...
...
00:40:03.529 > <0000001e>GetResponse - Sent Xy to <00000014>
00:40:03.529 < <00000014>worker - Received Xy from <0000001e>
...
00:42:12.500 < <00000012>server - Received Xy from <0000001c>
...
00:42:12.500 > <0000001d>GetResponse - Sent Xy to <00000013>
00:42:12.500 < <00000013>ObjectSpool[SPOOLING] - Received Xy ...
...
00:42:12.501 > <00000022>GetResponse - Sent Xy to <00000014>
00:42:12.501 < <00000015>worker - Received Xy from <00000022>
The line containing;
<00000014>worker - Received Xy
is followed by the same line but with a different id;
<00000015>worker - Received Xy
This illustrates the distribution of requests among the workers.
There is now concurrency courtesy of the multiple workers. There is also a fixed number of platform threads assigned to the server and the one-time cost of creating those threads is incurred at startup time. It is possible to tune the number of workers to suit the deployment environment.
Operation Of A Spool
An operational spool consists of a collection of workers, a request queue and a few configuration parameters. On receiving a
request the spool locates an available worker and forwards the request. A callback is registered (i.e. on_return()
)
for the processing of the response. Load distribution is round-robin, as availability allows. If a worker is not available
the request is appended to the queue.
During execution of a callback the queue is checked. A non-empty queue results in the forwarding of the oldest, deferred request. Availability of a worker is guaranteed as the worker that triggered the callback, has just become available.
There are five operational parameters that can be set at creation time;
object_count
size_of_queue
responsiveness
busy_pass_rate
stand_down
There is explicit control over the number of workers, the maximum number of queued requests and the expected performance of the workers, expressed as a maximum time between presentation of a request and receiving the response.
An average response time is calculated across a number of the most recent requests. When this average exceeds the given
response time, the spool is considered busy. In this state it uses the busy_pass_rate
to reject a percentage of the inbound
requests, e.g. busy_pass_rate=10
says that one tenth of received requests will be processed and the remainder rejected.
The few requests that do pass through to a worker()
are needed to recover normal operation, i.e. they cause updates
to the average performance metric and therefore the busy status of the spool.
Both size_of_queue
and responsiveness
can be set to None
, disabling the associated behaviour. If the former
is None
the queue is never considered full and if the latter is None
the workers are never judged to be busy.
A stand_down
of None
disables the recovery of workers and the failure of a single worker will cause the termination
of the entire spool. Improbable parameters are rejected at startup time.
When a new request encounters a full condition the spool responds immediately with an Overloaded
message. All clients
of a spool should be checking for what they receive as a response. The Overloaded
and Busy
messages derive
from the Faulted
message.
In the event that a worker terminates and depending on the value of stand_down
, the spool replaces it with a fresh instance.
It inserts a randomized delay into this processing to avoid any unfortunate synchronization. The delay applied is stand_down
seconds plus or minus up to 25%.