Hey, guys! It’s time to solve a typical interview challenge. In particular, this (and the following) post will guide you through the creation of a simple fully tested chat server using the Python programming language.

Let’s start directly from the statement of the problem.

Write a very simple chat server that listens on TCP port 10000 for multiple client connections and broadcasts each incoming client message to all the other connected clients. The program should be fully tested too.

Reading carefully, we can see the challenge is twofold. Indeed, in order to solve it, we need to:

  1. write the code for a chat server able to broadcast messages to all the clients;
  2. write the code for testing the chat server created at the previous point.

So, the chat server code must be testable because, at the other side, we must be able to test it.

From my knowledge*, there are two basic ways to solve this kind of problem with Python. The former, combines pure Python with socket programming for the implementation of the chat server and uses standard unit testing tools, i.e. the unittest module, for the creation of the tests. The latter, uses Twisted to build up the chat server and twisted.trial for the testing purposes. In this post, we’ll leave apart Twisted for a moment and we’ll see how to implement a simple pythonic chat server with socket programming. In the next post, we’ll dive into the testing of the created chat server.

Let’s start!

Google is always such a seductive tool to look up for solutions to our problems.

Source: Programmer’s Life

Anyway, in this case, if you try googling for an answer you won’t get what you really would like to. In particular, among the very first results you will find BinaryTides and bogotobogoBinaryTides provides a solution which is entirely contained in the if __name__ == "__main__". On the other side, bogotobogo shows a slightly improved solution with respect to BinaryTides: here, the code has been structured so that the chat server is given by a function. Is this sufficient for us? Unfortunately no, it isn’t.

What’s the main idea then?

Ideally, we want the chat server to be an object. Furthermore, the chat server object should have two main public methods:

  • a start method: which actually runs the chat server.
  • a stop method: which stops the current execution of the chat server.

Why do we need an object with a start and a stop method?

A unit test is a fully auto-contained piece of software. What does this software unit do once it is executed? Well, it basically initiates the context of the test, performs the checks for which it has been developed and finally, it cleans up the context. You can see clearly the reason of the start and the stop methods: we can’t have the unit test for the chat server running indefinitely so we need both a way to start it and a way to stop it once the checks are completed. This is also the reason why the solutions provided by BinaryTides and bogotobogo don’t fit the requirements of the problem statement.

Let’s go down deep into the solution!

The script is quite complex, so we’re going to analyse it step by step and finally we’ll look at the overall solution.

Lines 1-8 simply serves the purpose of importing the necessary packages and defining two main constants: _HOST and _PORT. These constants represent respectively the host and the port onto which the socket connection will be bound.

import math
import struct
import socket
import select
import threading

_HOST = '127.0.0.1'  # defines the host as "localhost"
_PORT = 10000        # defines the port as "10000"

Lines 10-30 defines the ChatServer object as a threading.Thread. This means we’ll need to override the run() method in order to call the start() method of the parent class and get the ChatServer running properly. At the beginning, three main class constants are declared for the ChatServer:

  • MAX_WAITING_CONNECTIONS specifies the maximum number of queued connections before the rejection will start.
  • RECV_BUFFER specifies the size (in bytes) of the receiving buffer.
  • RECV_MSG_LEN specifies the size (in bytes) of the placeholder contained at the beginning of the messages.

Why do we need a placeholder at the very beginning of each message?

If we want to closely follow the problem statement, we don’t necessarily need it. Think about the situation in which the client is sending a considerable amount of data. Let’s say the client sends a 10000 bytes message while our server can receive at most 4096 bytes at a time. Now, we’re facing with a decision: we may simply assume the server collects the first 4096 bytes of every client message or we may study a solution to retrieve the full 10000 bytes message (and generally a message of any length). Let’s make things harder and try to implement a solution allowing for the complete receipt of each single message. Since TCP/IP is a stream-based protocol, we need to define our own message-based protocol on top of TCP/IP in order to distinguish the start and the end of each individual message. An easy and effective protocol, suggested in a Stack Overflow thread and applied here, expects the client prefixes each message with its length. So that’s why we introduced the RECV_MSG_LEN constant.

class ChatServer(threading.Thread):
    """
    Defines the chat server as a Thread.
    """

    MAX_WAITING_CONNECTIONS = 10
    RECV_BUFFER = 4096
    RECV_MSG_LEN = 4

    def __init__(self, host, port):
        """
        Initializes a new ChatServer.

        :param host: the host on which the server is bounded
        :param port: the port on which the server is bounded
        """
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.connections = []  # collects all the incoming connections
        self.running = True  # tells whether the server should run

Regarding the initialisation method, there are two relevant instance variables. The former is self.connections which stores all the active connections (especially the clients’ ones), while the latter is self.running which allows us starting and stopping the ChatServer at will.

Lines 32-40 defines the _bind_socket__function which creates the server socket, binds it to the given host and port and listens for at most __MAX_WAITING_CONNECTIONS incoming client connections.

def _bind_socket(self):
        """
        Creates the server socket and binds it to the given host and port.
        """
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(self.MAX_WAITING_CONNECTIONS)
        self.connections.append(self.server_socket)

Lines 42-52 defines the convenience method _send which prefixes a given message with its length before sending it through a given socket connection. It will be used by the server to broadcast a client message to all the other connected clients.

    def _send(self, sock, msg):
        """
        Prefixes each message with a 4-byte length before sending.

        :param sock: the incoming socket
        :param msg: the message to send
        """
        # Packs the message with 4 leading bytes representing the message length
        msg = struct.pack('>I', len(msg)) + msg
        # Sends the packed message
        sock.send(msg)

Lines 54-84 defines the _receive function which contains the data receipt implementation logic. According to the message protocol we’ve chosen to build over TCP/IP, each client message is prefixed by 4 bytes representing its length. So the server, at each message receipt, will unpack it and read its first 4 bytes to get the length. Once this information has been acquired, the server will call multiple times the recv method in order to get the overall message.

    def _receive(self, sock):
        """
        Receives an incoming message from the client and unpacks it.

        :param sock: the incoming socket
        :return: the unpacked message
        """
        data = None
        # Retrieves the first 4 bytes from the message
        tot_len = 0
        while tot_len < self.RECV_MSG_LEN:
            msg_len = sock.recv(self.RECV_MSG_LEN)
            tot_len += len(msg_len)
        # If the message has the 4 bytes representing the length...
        if msg_len:
            data = ''
            # Unpacks the message and gets the message length
            msg_len = struct.unpack('>I', msg_len)[0]
            tot_data_len = 0
            while tot_data_len < msg_len:
                # Retrieves the chunk of max RECV_BUFFER size
                chunk = sock.recv(self.RECV_BUFFER)
                # If there isn't the expected chunk...
                if not chunk:
                    data = None
                    break # ... Simply breaks the loop
                else:
                    # Merges the chunks content
                    data += chunk
                    tot_data_len += len(chunk)
        return data

Lines 87-104 defines the _broadcast method. Given both the incoming client socket and message, it iterates over the relevant socket connections in order to spread the message to all the other connected clients using the _send method. In addition, the method detects possible client disconnections and removes the involved sockets from the list of the active sockets.

    def _broadcast(self, client_socket, client_message):
        """
        Broadcasts a message to all the clients different from both the server itself and
        the client sending the message.

        :param client_socket: the socket of the client sending the message
        :param client_message: the message to broadcast
        """
        for sock in self.connections:
            is_not_the_server = sock != self.server_socket
            is_not_the_client_sending = sock != client_socket
            if is_not_the_server and is_not_the_client_sending:
                try :
                    self._send(sock, client_message)
                except socket.error:
                    # Handles a possible disconnection of the client "sock" by...
                    sock.close()  # closing the socket connection
                    self.connections.remove(sock)  # removing the socket from the active connections list

Lines 106-147 defines the _run method which implements the overall server logic. As long as the server runs, it continuously monitors all the active sockets (contained in the self.connections list) for readable activity. This process is done with the select.select __call. The __select function takes as input three lists of sockets that are, respectively, waiting for reading, writing or for an exceptional condition and, in turn, it returns the lists of socket descriptors that are readable, writeable or simply have fallen into an error status. The only list we care is the list of readable sockets. At this point, if the server socket becomes readable, the server will accept and handle a new client connection. As an addition, the server will broadcast to all the other connected clients a message telling that a new client entered the chat room. On the other hand, if a client socket becomes readable, then the server will acquire the incoming message from the client and forward it to all the other connected sockets  (except for the sending one). The broadcast is performed via the _broadcast function previously described.

    def _run(self):
        """
        Actually runs the server.
        """
        while self.running:
            # Gets the list of sockets which are ready to be read through select non-blocking calls
            # The select has a timeout of 60 seconds
            try:
                ready_to_read, ready_to_write, in_error = select.select(self.connections, [], [], 60)
            except socket.error:
                continue
            else:
                for sock in ready_to_read:
                    # If the socket instance is the server socket...
                    if sock == self.server_socket:
                        try:
                            # Handles a new client connection
                            client_socket, client_address = self.server_socket.accept()
                        except socket.error:
                            break
                        else:
                            self.connections.append(client_socket)
                            print "Client (%s, %s) connected" % client_address

                            # Notifies all the connected clients a new one has entered
                            self._broadcast(client_socket, "\n[%s:%s] entered the chat room\n" % client_address)
                    # ...else is an incoming client socket connection
                    else:
                        try:
                            data = self._receive(sock) # Gets the client message...
                            if data:
                                # ... and broadcasts it to all the connected clients
                                self._broadcast(sock, "\r" + '<' + str(sock.getpeername()) + '> ' + data)
                        except socket.error:
                            # Broadcasts all the connected clients that a clients has left
                            self._broadcast(sock, "\nClient (%s, %s) is offline\n" % client_address)
                            print "Client (%s, %s) is offline" % client_address
                            sock.close()
                            self.connections.remove(sock)
                            continue
        # Clears the socket connection
        self.stop()

Lines 149-153 defines the run method, which automatically overrides the homonym method in the parent class. It simply creates and binds the server socket using the _bind_socket function and then executes the _run method which implements the server logic.

    def run(self):
        """Given a host and a port, binds the socket and runs the server.
        """
        self._bind_socket()
        self._run()

Lines 155-161 defines the stop method which simply sets the self.running variable to False and closes the server socket connection. This will allow us to stop our server once the execution of the test cases is finished.

    def stop(self):
        """
        Stops the server by setting the "running" flag before closing
        the socket connection.
        """
        self.running = False
        self.server_socket.close()

Lines 164-175 defines and executes the main function. It simply creates a ChatServer object and calls its start() method.

def main():
    """
    The main function of the program. It creates and runs a new ChatServer.
    """
    chat_server = ChatServer(_HOST, _PORT)
    chat_server.start()


if __name__ == '__main__':
    """The entry point of the program. It simply calls the main function.
    """
    main()

That’s it! Here it is the full solution based on the threading module.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import math
import struct
import socket
import select
import threading

_HOST = '127.0.0.1'  # defines the host as "localhost"
_PORT = 10000        # defines the port as "10000"

class ChatServer(threading.Thread):
    """
    Defines the chat server as a Thread.
    """

    MAX_WAITING_CONNECTIONS = 10
    RECV_BUFFER = 4096
    RECV_MSG_LEN = 4

    def __init__(self, host, port):
        """
        Initializes a new ChatServer.

        :param host: the host on which the server is bounded
        :param port: the port on which the server is bounded
        """
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.connections = []  # collects all the incoming connections
        self.running = True  # tells whether the server should run

    def _bind_socket(self):
        """
        Creates the server socket and binds it to the given host and port.
        """
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(self.MAX_WAITING_CONNECTIONS)
        self.connections.append(self.server_socket)

    def _send(self, sock, msg):
        """
        Prefixes each message with a 4-byte length before sending.

        :param sock: the incoming socket
        :param msg: the message to send
        """
        # Packs the message with 4 leading bytes representing the message length
        msg = struct.pack('>I', len(msg)) + msg
        # Sends the packed message
        sock.send(msg)

    def _receive(self, sock):
        """
        Receives an incoming message from the client and unpacks it.

        :param sock: the incoming socket
        :return: the unpacked message
        """
        data = None
        # Retrieves the first 4 bytes from the message
        tot_len = 0
        while tot_len < self.RECV_MSG_LEN:
            msg_len = sock.recv(self.RECV_MSG_LEN)
            tot_len += len(msg_len)
        # If the message has the 4 bytes representing the length...
        if msg_len:
            data = ''
            # Unpacks the message and gets the message length
            msg_len = struct.unpack('>I', msg_len)[0]
            tot_data_len = 0
            while tot_data_len < msg_len:
                # Retrieves the chunk i-th chunk of RECV_BUFFER size
                chunk = sock.recv(self.RECV_BUFFER)
                # If there isn't the expected chunk...
                if not chunk:
                    data = None
                    break # ... Simply breaks the loop
                else:
                    # Merges the chunks content
                    data += chunk
                    tot_data_len += len(chunk)
        return data


    def _broadcast(self, client_socket, client_message):
        """
        Broadcasts a message to all the clients different from both the server itself and
        the client sending the message.

        :param client_socket: the socket of the client sending the message
        :param client_message: the message to broadcast
        """
        for sock in self.connections:
            is_not_the_server = sock != self.server_socket
            is_not_the_client_sending = sock != client_socket
            if is_not_the_server and is_not_the_client_sending:
                try :
                    self._send(sock, client_message)
                except socket.error:
                    # Handles a possible disconnection of the client "sock" by...
                    sock.close()  # closing the socket connection
                    self.connections.remove(sock)  # removing the socket from the active connections list

    def _run(self):
        """
        Actually runs the server.
        """
        while self.running:
            # Gets the list of sockets which are ready to be read through select non-blocking calls
            # The select has a timeout of 60 seconds
            try:
                ready_to_read, ready_to_write, in_error = select.select(self.connections, [], [], 60)
            except socket.error:
                continue
            else:
                for sock in ready_to_read:
                    # If the socket instance is the server socket...
                    if sock == self.server_socket:
                        try:
                            # Handles a new client connection
                            client_socket, client_address = self.server_socket.accept()
                        except socket.error:
                            break
                        else:
                            self.connections.append(client_socket)
                            print "Client (%s, %s) connected" % client_address

                            # Notifies all the connected clients a new one has entered
                            self._broadcast(client_socket, "\n[%s:%s] entered the chat room\n" % client_address)
                    # ...else is an incoming client socket connection
                    else:
                        try:
                            data = self._receive(sock) # Gets the client message...
                            if data:
                                # ... and broadcasts it to all the connected clients
                                self._broadcast(sock, "\r" + '<' + str(sock.getpeername()) + '> ' + data)
                        except socket.error:
                            # Broadcasts all the connected clients that a clients has left
                            self._broadcast(sock, "\nClient (%s, %s) is offline\n" % client_address)
                            print "Client (%s, %s) is offline" % client_address
                            sock.close()
                            self.connections.remove(sock)
                            continue
        # Clears the socket connection
        self.stop()

    def run(self):
        """Given a host and a port, binds the socket and runs the server.
        """
        self._bind_socket()
        self._run()

    def stop(self):
        """
        Stops the server by setting the "running" flag before closing
        the socket connection.
        """
        self.running = False
        self.server_socket.close()


def main():
    """
    The main function of the program. It creates and runs a new ChatServer.
    """
    chat_server = ChatServer(_HOST, _PORT)
    chat_server.start()


if __name__ == '__main__':
    """The entry point of the program. It simply calls the main function.
    """
    main()

* If you know another way to solve the challenge, get in touch with me!