Distribution With Multihosting

Note

To follow the materials mentioned in this section, change to the layer-cake-demos/multihosting folder.

Multithreading and multiprocessing demonstrations have focused on the implementation of concurrency within the scope of a single process. Processes that start and manage sub-processes to their completion, are included within the definition of a single process.

To make use of physically distributed computing resources there needs to be multihosting, where the network service is no longer a single process. Instead it consists of a logically related collection of processes, each of which may be running on any host within a network. For the demonstration network service, this would bring the ultimate in concurrency. Each worker in the pool could be running on its own physical platform with its own set of computing resources.

The layer-cake library provides both traditional networking, through the listen() and connect() functions, and publish-subscribe networking (or just pubsub). To really showcase what layer-cake can do, this final section on concurrency will focus on the latter.

Pubsub is implemented as the publish() and subscribe() pair of functions, and a small set of networking components; group-cake, host-cake and lan-cake. At least one of these components must be present for pubsub to work. The use of the first is effectively automated, while the latter two need to be installed as part of the operational environment. They play a background role similar to DHCP or dynamic DNS. Installation requires a few trivial commands, and then all operation is discreet. The most difficult aspect of pubsub is the placement of lan-cake. Further detail appears in the following sections.

A Session With A Published Service

Registering a worker as a service looks like;

# test_worker_9.py
import layer_cake as lc
from test_api import Xy, table_type
from test_function_9 import texture

def worker(self):
    lc.publish(self, 'test-multihosting:worker-9')
    m = self.input()
    if not isinstance(m, lc.Published):
        return m

    while True:
        m = self.input()
        if isinstance(m, Xy):
            pass
        elif isinstance(m, lc.Faulted):
            return m
        elif isinstance(m, lc.Stop):
            return lc.Aborted()
        else:
            continue

        table = texture(x=m.x, y=m.y)
        self.send(lc.cast_to(table, table_type), self.return_address)

lc.bind(worker)

if __name__ == '__main__':
    lc.create(worker)

The interesting line of code is;

lc.publish(self, 'test-multihosting:worker-9')

Rather than listening at a particular network IP and port, this worker() is advertising its presence under a string name. By convention the name is structured as a series of fields separated by a colon. The worker() is defined with both thread and process entry-points.

Establishing a subscriber session from the server() looks like;

# test_server_9.py
import layer_cake as lc
from test_api import Xy, table_type

DEFAULT_ADDRESS = lc.HostPort('127.0.0.1', 5050)
SERVER_API = [Xy,]

def server(self, server_address: lc.HostPort=None):
    server_address = server_address or DEFAULT_ADDRESS

    lc.listen(self, server_address, http_server=SERVER_API)
    m = self.input()
    if not isinstance(m, lc.Listening):
        return m

    lc.subscribe(self, 'test-multihosting:worker-9')
    m = self.input()
    if not isinstance(m, lc.Subscribed):
        return m

    worker_spool = self.create(lc.ObjectSpool, None)

    while True:
        m = self.input()
        if isinstance(m, Xy):
            pass
        elif isinstance(m, lc.Returned):
            d = self.debrief()
            if isinstance(d, lc.OnReturned):
                d(self, m)
            continue
        elif isinstance(m, lc.Available):
            self.send(lc.JoinSpool(m.publisher_address), worker_spool)
            continue
        elif isinstance(m, lc.Dropped):
            self.send(lc.LeaveSpool(m.remote_address), worker_spool)
            continue
        elif isinstance(m, lc.Faulted):
            return m
        elif isinstance(m, lc.Stop):
            return lc.Aborted()
        else:
            continue

        # Callback for on_return.
        def respond(self, response, args):
            self.send(lc.cast_to(response, self.returned_type),
                args.return_address)

        a = self.create(lc.GetResponse, m, worker_spool)
        self.on_return(a, respond, return_address=self.return_address)

lc.bind(server)

if __name__ == '__main__':
    lc.create(server)

The first line of interest is;

lc.subscribe(self, 'test-multihosting:worker-9')

Rather than attempting to connect to a particular network IP and port, this server() is registering interest in a string name. There is also the special definition of spool;

worker_spool = self.create(lc.ObjectSpool, None)

This spool receives the addresses of its workers from an external source, indicated by the passing of a None;

elif isinstance(m, lc.Available):
    self.send(lc.JoinSpool(m.publisher_address), worker_spool)
    continue

At some point the server receives notification that the named service is available. The server updates the spool with the new information. A matching procedure occurs around the loss of a service, i.e. on receiving a Dropped message the spool is directed to forget the specified worker.

To run this implementation enter the following commands;

$ layer-cake create
$ layer-cake add test_server_9.py server
$ layer-cake add test_worker_9.py worker

This creates a small hierarchy of sub-folders and files in the .layer-cake folder. To run all the processes described in that folder, use this command line;

$ layer-cake run --debug-level=DEBUG
<0000000e>ListenConnect - Created by <00000001>
<0000000e>ListenConnect - Received Start from <00000001>
<0000000e>ListenConnect - Sent SocketChannel to <00000001>
<0000000f>ObjectDirectory[INITIAL] - Created by <00000001>
...
<00000012>layer_cake - Created by <00000011>
<00000012>layer_cake - run (.../multihosting/.layer-cake)
<00000013>head_lock - Created by <00000012>
<00000013>head_lock - Sent Ready to <00000012>
<00000012>layer_cake - Received "Ready" from <19>
...
<00000015>ProcessObject[INITIAL] - Created by <00000012>
<00000015>ProcessObject[INITIAL] - Received Start from <00000012>
<00000015>ProcessObject[INITIAL] - .../group-cake .../multihosting/.layer-cake
...
<00000013>ProcessObject[INITIAL] - Created by <00000012>
<00000014>ProcessObject[INITIAL] - Created by <00000012>
<00000013>ProcessObject[INITIAL] - Received Start from <00000012>
<00000013>ProcessObject[INITIAL] - .../python3 .../test_server_9.py ...
...
<00000014>ProcessObject[INITIAL] - Received Start from <00000012>
<00000014>ProcessObject[INITIAL] - .../python3 .../test_worker_9.py ...
...
<00000013>server - Received Available from <00000016>
<00000013>server - Sent JoinSpool to <00000014>
<00000014>ObjectSpool[SPOOLING] - Received JoinSpool from <00000013>

Stepping through the logs it is possible to see the layer-cake process starting the group-cake process and then the group-cake process starting the server and worker processes. Confirm that the server has found the worker and that the worker is being put to use by the spool;

$ curl -s 'http://127.0.0.1:5050/Xy?x=2&y=2'
{
    "value": [
        "vector<vector<float8>>",
        [
            [
                0.5647838146363222,
                0.5596026171995564
            ],
            [
                0.1567212327148707,
                0.7033970937636289
            ]
        ],
        []
    ]
}

The layer-cake CLI tool is - among other things - a process orchestration tool. It provides sub-commands for describing a set of processes and sub-commands for initiating those processes, the result of which is known as a composite process. This concept is strengthened by the discreet inclusion of group-cake, which provides the supporting pubsub machinery to bring the server() and worker() together.

Both publish() and subscribe() are about entering networking information into the pubsub machinery. There is no expectation that subscribing will produce an immediate indication of whether a connection has been created. Connections occur when matching parties are both present. Even though use of the layer-cake tool ensures that both processes are started quickly, there is no guaranteed ordering, i.e. the subscribing may occur before the publishing. With pubsub, the non-deterministic nature of startup order is of no consequence.

Connecting To Multiple Instances Of A Service

The obvious approach to connecting multiple workers would be to create multiple processes that each registered a different configured name. The server() would also have to include multiple calls to subscribe() to register for each of the different names. Happily there are only a few minor changes needed. Registration of a worker needs upgrading;

# test_worker_10.py
import uuid
import layer_cake as lc
from test_api import Xy, table_type
from test_function_10 import texture

def worker(self):
    tag = uuid.uuid4()
    lc.publish(self, f'test-multihosting:worker-10:{tag}')
    m = self.input()
    if not isinstance(m, lc.Published):
        return m

    while True:
        m = self.input()
        if isinstance(m, Xy):
            pass
        elif isinstance(m, lc.Faulted):
            return m
        elif isinstance(m, lc.Stop):
            return lc.Aborted()
        else:
            continue

        table = texture(x=m.x, y=m.y)
        self.send(lc.cast_to(table, table_type), self.return_address)

lc.bind(worker)

if __name__ == '__main__':
    lc.create(worker)

The interesting line of code is;

lc.publish(self, f'test-multihosting:worker-10:{tag}')

The name has been augmented with a UUID as the trailing field. Every instance of this worker() is automatically announced to the network under a unique name. Establishing a client session from the server() now looks like;

# test_server_10.py
import layer_cake as lc
from test_api import Xy

DEFAULT_ADDRESS = lc.HostPort('127.0.0.1', 5050)
SERVER_API = [Xy,]

def server(self, server_address: lc.HostPort=None):
    server_address = server_address or DEFAULT_ADDRESS

    lc.listen(self, server_address, http_server=SERVER_API)
    m = self.input()
    if not isinstance(m, lc.Listening):
        return m

    lc.subscribe(self, r'test-multihosting:worker-10:[-a-f0-9]+')
    m = self.input()
    if not isinstance(m, lc.Subscribed):
        return m

    worker_spool = self.create(lc.ObjectSpool, None)

    while True:
        m = self.input()
        if isinstance(m, Xy):
            pass
        elif isinstance(m, lc.Returned):
            d = self.debrief()
            if isinstance(d, lc.OnReturned):
                d(self, m)
            continue
        elif isinstance(m, lc.Available):
            self.send(lc.JoinSpool(m.publisher_address), worker_spool)
            continue
        elif isinstance(m, lc.Dropped):
            self.send(lc.LeaveSpool(m.remote_address), worker_spool)
            continue
        elif isinstance(m, lc.Faulted):
            return m
        elif isinstance(m, lc.Stop):
            return lc.Aborted()
        else:
            continue

        # Callback for on_return.
        def respond(self, response, args):
            self.send(lc.cast_to(response, self.returned_type),
                args.return_address)

        a = self.create(lc.GetResponse, m, worker_spool)
        self.on_return(a, respond, return_address=self.return_address)

lc.bind(server)

if __name__ == '__main__':
    lc.create(server)

The first line of interest is;

lc.subscribe(self, r'test-multihosting:worker-10:[-a-f0-9]+')

This server() is registering interest in any name matching a pattern. The trailing field is a regular expression that will generally match the text version of a UUID.

To try out this new arrangement;

$ layer-cake destroy
$ layer-cake create
$ layer-cake add test_server_10.py server
$ layer-cake add test_worker_10.py worker --role-count=8

An initial destroy command deletes the previous definition of the composite process. The add command accepts a --role-count parameter that is used to add multiple instances of the same module. Decoration of the instance name with an ordinal number is automated;

$ layer-cake list --long-listing
server      /home/.../multihosting/test_server_10.py 4/7/500
worker-0    /home/.../multihosting/test_worker_10.py 4/7/502
worker-1    /home/.../multihosting/test_worker_10.py 4/7/502
worker-2    /home/.../multihosting/test_worker_10.py 4/7/502
worker-3    /home/.../multihosting/test_worker_10.py 4/7/502
worker-4    /home/.../multihosting/test_worker_10.py 4/7/502
worker-5    /home/.../multihosting/test_worker_10.py 4/7/502
worker-6    /home/.../multihosting/test_worker_10.py 4/7/502
worker-7    /home/.../multihosting/test_worker_10.py 4/7/502

Go ahead and run this latest service;

$ layer-cake run --debug-level=DEBUG
<0000000e>ListenConnect - Created by <00000001>
<0000000e>ListenConnect - Received Start from <00000001>
...
<00000012>layer_cake - run (...,home_path=.../multihosting/.layer-cake)
<00000013>head_lock - Created by <00000012>
<00000013>head_lock - Sent Ready to <00000012>
<00000012>layer_cake - Received "Ready" from <19>
...
<0000001c>ProcessObject[INITIAL] - Created by <00000012>
<0000001c>ProcessObject[INITIAL] - Received Start from <00000012>
<0000001c>ProcessObject[INITIAL] - .../group-cake ... .../.layer-cake
<0000001c>ProcessObject[INITIAL] - Started process (1559661)
...
<00000012>Group[INITIAL] - Created by <00000011>
<00000012>Group[INITIAL] - Received Start from <00000011>
...
<0000000e>ListenConnect - Listening on "127.0.0.1:43745", ...
...
<00000013>ProcessObject[INITIAL] - Created by <00000012>
<00000014>ProcessObject[INITIAL] - Created by <00000012>
<00000015>ProcessObject[INITIAL] - Created by <00000012>
<00000016>ProcessObject[INITIAL] - Created by <00000012>
...
<00000013>ProcessObject[INITIAL] - Received Start from <00000012>
<00000013>ProcessObject[INITIAL] - .../python3 .../test_worker_10.py ...
...
<00000014>ProcessObject[INITIAL] - .../python3 .../test_server_10.py ...

The logs show the server() being notified of the presence of a worker() and the information being passed onto the spool. This process is repeated the expected number of times. A view of the hierarchy of processes created by the layer-cake run command, is available through the layer-cake network command. The proper use of this command is described in the following section.

A final implementation of multihosting has been included, i.e. test_server_11.py. Load testing of the service highlighted those areas that struggled as load increased. Generally these could be tuned away using configuration values in the network service. Under extreme load the network stack will shutdown the listen, resulting in a NotListening message arriving at the server(). This final implementation takes a more careful approach to termination, performing a managed termination of the spool and the subscription. See the following section for notes on configuring a group for automatic restarts.

Clients That Fan-Out To Multiple Servers

Through the use of a search pattern in the subscriber, a one-to-many relationship is created between the server() and a set of workers(). Achieving a similar connection graph using traditional listen() and connect() functions would be difficult for several reasons. The simplest approach might be to have the workers connect to the central server, but this couples the worker() to that specific relationship. In the pubsub solution the workers have no specific knowledge of who is establishing a session. Future subscribers can make connnections without disturbing existing relationships, and also without requiring any code changes in the worker().

Arranging for the server() to connect to the workers() would involve an address for every worker() instance, making configuration and maintenance more difficult. There would need to be some means of notifying the server() of changes to the set of known addresses.

Connecting To Multiple Hosts

At this point there is no more coding to be done. Courtesy of pubsub networking, the latest version of worker() can be deployed anywhere on a network and the server() will find it. However, successful operation will require some initial, one-time setup.

The next level of pubsub is provided by host-cake. The presence of this component enables a wider range of networking scenarios, but still within the boundary of a single host. Assuming that the previous demonstration of group-cake is still running, enter the following command in a separate shell;

$ host-cake --debug-level=DEBUG

The existing group-cake process automatically connects to the new host-cake process. All the service information it is holding is pushed up to the new process. Open another shell and enter the following command;

$ python3 test_worker_10.py --debug-level=DEBUG

The new worker() instance is immediately added to the pool of workers. This demonstrates pubsub without the presence of group-cake in the sense that this application process connects directly to the host-cake process.

To verify that all the pieces of your software solution are properly installed into the publish-subscribe machinery, use the layer-cake network command;

(.env) toby@seneca:~/../multihosting$ layer-cake network
[HOST] host-cake (9e7db6d6)
+   [GROUP] group-cake (02f461b3)
+   +   [PROCESS] test_worker_10.py (873dd3e9)
+   +   [PROCESS] test_server_10.py (565edd3c)
+   +   [PROCESS] test_worker_10.py (edf78eec)
+   +   [PROCESS] test_worker_10.py (4c68fcd9)
+   +   [PROCESS] test_worker_10.py (a516934c)
+   +   [PROCESS] test_worker_10.py (834198d9)
+   +   [PROCESS] test_worker_10.py (2002ab92)
+   +   [PROCESS] test_worker_10.py (daf02dd7)
+   +   [PROCESS] test_worker_10.py (3773514d)
+   [PROCESS] test_worker_10.py (95440db7)
+   [PROCESS] layer-cake (ba2eb859)

This shows the composite process (group-cake), the standalone test_worker_10.py and the layer-cake CLI, all making connections to the local instance of host-cake. There is extensive information available through the network command, including listing of all current subscriber-to-publisher sessions. For further information look here.

There can be any number of composite processes (i.e. group-cake) and application processes connecting to the local host-cake. As demonstrated, once host-cake is in place this community of processes requires zero networking configuration. The local host should be configured with the following commands;

$ cd <operational-folder>
$ layer-cake create
$ layer-cake add host-cake
$ layer-cake update group --retry='{"regular_steps": 30.0}'

The update command is used to configure a restart of host-cake in 30 seconds, in the event that it terminates. Other members of the retry argument (i.e. RetryIntervals) are available to randomize the delay. At boot-time the host should execute the following command;

$ cd <operational-folder>
$ layer-cake start

The next level of pubsub support is provided by lan-cake. Setup at this level is a bit more involved, especially if the operational environment is a strictly controlled network. The new lan-cake process needs to be located on a machine by itself. More accurately it cannot be cohabiting a machine with an application process such as test_server_10.py.

The simplest scenario for deployment of the lan-cake process would be to configure the process to run at boot-time, on a dedicated host. This might be appropriate use of an SBC, e.g. a Raspberry Pi. Otherwise, this is the least likely scenario and given the low resource requirements of the lan-cake process, probably a squandering of computing power.

The next option is to configure the process to run at boot-time on a dedicated virtual machine, e.g. using VirtualBox. This provides the separation that the process needs from all application processes without the cost of dedicated hardware. The physical host should be configured to start the virtual machine at boot-time.

Lastly, the lan-cake process may be installed alongside other server-room software, on a pre-existing host within the operational network.

If at all practicable, the chosen host should be assigned the standard layer-cake LAN IP address. This results in an environment where every layer-cake process involved in inter-host networking can proceed with zero configuration. This applies to all host-cake, group-cake and application processes that ever run within the target network.

The standard layer-cake LAN IP is derived from the private address range in use, the primary IP of the local host, a defined station number (195) and a defined port number (54195), i.e.

  • 10.0.0.195

  • 172.16.0.195

  • 192.168.0.195

The starting point is the primary IP of the local host. This is matched against the possible private address ranges and the final octet is replaced with the station number. Intervening octets are set to the base values for that range.

If the standard layer-cake LAN IP cannot be used then every connecting process must be started with a command like;

$ python3 test_worker_10.py --connect-to-directory={“host”: “10.0.0.133”, “port”: 29101}

Installing and configuring host-cake appropriately on every operational host is one strategy for reducing the potential for related problems;

$ layer-cake update host-cake --directory-at-lan={“host”: “10.0.0.133”, “port”: 29101}

Instances of group-cake and application processes on this host will connect to this host-cake and thereby join the directory rooted at the specified address. Once there is a designated lan-cake machine and it is configured with the proper network address, it also needs to be configured with the following commands;

$ cd <operational-folder>
$ layer-cake create
$ layer-cake add lan-cake
$ layer-cake update group --retry='{"regular\_steps": 30.0}'

At boot-time the host should execute the following command;

$ cd <operational-folder>
$ layer-cake start

A Distributed, Hierarchical Directory

Conceptually, the layer-cake directory is a tree with group-cake, host-cake and lan-cake at the nodes and application processes as the terminal leaves. A lan-cake node is at the root of the tree (i.e. the top of the hierarchy). A directory with all elements present, and with pubsub sessions displayed, looks like;

(.env) toby@seneca:~/../multihosting$ layer-cake network --list-connected
[LAN] lan-cake (f1a042b8)
+   [HOST] host-cake (45199baf)
+   +   [PROCESS] test_worker_10.py (87c00a45)
+   [HOST] host-cake (fcf744be)
+   +   [PROCESS] test_server_10.py (0ae5c793)
+   +   +   ? "test-multihosting:worker-10:[-a-f0-9]+" (4d0ddfee)
+   +   +   +   > "test-multihosting:worker-10:aefbd788-007e-4e0b-b43c-920820bb9c1e"[PROCESS] (4d0ddfee -> 8611e17e)
+   +   +   +   > "test-multihosting:worker-10:0be7bd59-0216-431f-9177-c66d470bfbf9"[LAN] (192.168.1.106:54828 -> 192.168.1.13:39097)
+   +   +   +   > "test-multihosting:worker-10:f91a0a51-a0da-487a-a4aa-85b5d8eee9fd"[PROCESS] (4d0ddfee -> b0e651d3)
+   +   +   ? "test_worker_10" (8d3454e3)
+   +   +   +   > "test_worker_10"[PROCESS] (8d3454e3 -> 87e86ddc)
+   +   +   [LIBRARY] test_worker_10.py (7abbd14c)
+   +   [PROCESS] layer-cake (d5be138d)

The test_server_10.py has opened three > sessions to matching publications. Two are local and a third is running on the host at 192.168.1.13. The second listing of a single > session is related to the presence of the library. The session marked with (8d3454e3 -> 87e86ddc) records the transport used when sending messages to the ProcessObject that are routed to the library process.

Installation and configuration of the directory is mostly automated. The items that cannot be automated are;

  • installation of host-cake,

  • determining the host for lan-cake,

  • determining the IP address for the lan-cake host,

  • installation of lan-cake.

These are all one-time operations performed on an as-needed basis; if you are not multihosting then there is no need for lan-cake. Composite processes (i.e. using group-cake) can happily operate as private, standalone directories without the need for other directory components.

The layer-cake directory provides service to any layer-cake process. This means that the one-time installation and configuration of the service will support the operation of multiple networking solutions, side-by-side. This also applies to multiple instances of the same solution, e.g. developers can work on their own private instances of a distributed solution by adopting an appropriate naming convention. All without concerns about duplicate assignment of IP addresses and port numbers, or misconfiguration.

Less obvious benefits derive from the fact that all the network address information pertaining to the solution is updated the moment that anything changes.

Pubsub enables the initial installation and startup of the different components in a solution. This can happen in any order and over an extended period (e.g. phased rollout).

Components of the solution, such as the test_worker_10.py, can be added and deleted as required. Investing in a cluster of machines and adding those computing resources (e.g. instances of workers) to a live system is fully automated.

Components are free to move around. A replacement service can be created on a new host. When ready, the old service is shut down and the replacement is started. All reconnections to the new address are automated and immediate.

Lastly, solutions can be rearranged across any collection of hosts including a lone host. For development purposes the ability to run an entire solution as a single composite process might be advantageous and is always an option. It is also possible to generate portable images of composite processes that just need the appropriate Python environment to run. Copy it to a laptop for demonstrations, sales or training.

Both the publish() and subscribe() functions accept a scope parameter;

lc.publish(self, 'super-system:log-store', scope=lc.ScopeOfDirectory.GROUP)

Service information is not propagated beyond its declared scope. Even with connectivity through host-group or lan-group processes, subscribers outside the group cannot see the super-system:log-store and cannot establish a session.

Where no scope is specified, the default is HOST. For full, automated matching of all subscribers to their intended services, this value might have been set at LAN. However, that could easily lead to unintended polling in the search for a lan-cake that will never be installed and inadvertent services leaks, i.e. access to a service that was never intended to be widely available.

Services with the same name can be registered within the wider directory. The name super-system:log-store can be registered with GROUP scope in multiple groups within a LAN, but there can only be one instance of a name published to a given scope. There can only be a single instance of home-automation:power-supply at the LAN scope.

Pubsub behaves in a manner similar to the symbol lookups in programming languages. The instance of super-system:log-store at GROUP scope has precedence over the instance registered at LAN scope; the GROUP instance is considered to be “nearer”.

Over time the different instances of services will come and go, as a consequence of events such as network outages and software updates. This creates decision points for a subscriber;

  • the nearest instance of super-system:log-store crashes but there is another instance at LAN scope,

  • the connected instance is at LAN scope but the instance at GROUP has restarted.

Layer cake implements two specific responses to these different scenarios. Firstly, there is fallback operation, where an existing connection to the nearest instance is lost and another instance is registered in the wider directory. A new session is immediately established with the alternate instance. The subscriber receives a sequence of Dropped and Available messages.

The second response is to upgrade an existing session. This is the reverse of the events described in the previous paragraph. This is when the “nearest” instance subsequently recovers. The subscriber receives another sequence of Dropped and Available messages and finds itself messaging with the new instance at the original scope.

Recovery of instances at a wider scope than an established session does not affect the established session.