Multithreading in Python: the threading module

by Alex
Multithreading in Python: the threading module

Modern software is designed so that its functions and tasks can run in parallel. Python provides the programmer with a powerful set of tools for working with threads in the threading library.

How threading works

Multithreading is the execution of a program in several threads at once, which perform its functions simultaneously. You might confuse multithreaded programming with multiprocessor programming. In fact their concepts are very similar, but while in the first case the program works with threads, in the other one it works with processes. The difference between threads and processes is simple: threads share memory, so changes in one thread are visible in the others, while processes use different memory areas. In fact, if we consider a single-core processor, operations from different threads are not executed in parallel. A single core can only perform one operation per unit of time, but because the operations are very fast, a sense of parallel execution, pseudo-parallelism, is created. Truly parallel programs can run only on multicore processors, where each core can perform operations independently of the others. A great example of using multithreading is a program where GUI drawing and user input processing are handled by different threads. If both tasks were placed in a single thread, the interface drawing would be interrupted every time the program receives input from the user. Using two threads allows these functions to be performed independently of each other. However, when executing a multi-threaded program on a single-core processor, its performance will be lower than if it were written in a single thread. This is because additional memory is spent on implementing and managing threads.

Can threading be considered multithreading?

Python uses GIL (Global Interpreter Lock), which is single threaded. All threads that are created with threading will run inside the GIL thread. As such, they will only be handled by a single core. No work simultaneously on several physical cores of the processor is out of the question.

And since threading will run on only one core of the processor, there is no speed advantage, only the opposite – threading will slow it down.

But you can’t do without it if you need to execute several tasks at once:

  • Process the button press in the GUI, for example with Tkinter. If a button press needs to perform a lot of actions that take time, these actions should be performed in another thread so that the GUI does not hang for that time. So the buttons must be locked and then unlocked as soon as the thread finishes its computation.
  • If our program works simultaneously with several connected devices. They can be connected to different COM ports.
  • If we download files from the network and process already downloaded files at the same time.
  • And so on..
If we need our program to run on several physical processor cores at the same time, we should pay attention to another module – Multiprocessing.

What are then the advantages of the Threading module compared to Multiprocessing? Let us consider them:

  • Ease of use.
  • It is easier to transfer data from the thread to the main program. In general, you can even use global variables. But in this case we must properly design the program in order to avoid “race condition” errors, which we will consider below.
So, if our program will be run on a single-core computer or the load on the processor will not be large, Threading is the best choice.

Connecting the Threading library

Threading is a standard module that comes with the interpreter. The programmer doesn’t need to install it. Instead, he simply plugs in the module:

import threading

Threads can be handled by creating instances of the Thread class. To create a separate thread, you need to create an instance of the class and apply the start() method to it. Here is an example:

import threading
def myfunc(a, b):
    print('sum :',a + b)
thr1 = threading.Thread(target = myfunc, args = (1, 2)).start()
print('main thread')

main thread
sum : 3

Here we have started the mydef function in a separate thread. We passed numbers 1 and 2 as arguments to the function.

threading.Thread()

This construct allows you to create a new thread by creating an instance of the Thread class. This is what its arguments look like: It takes arguments:

threading.Thread(group=None, target=None, name=None, args=(),
                 kwargs={}, *, daemon=None)

Let’s take a closer look at them:

  • group. Has value None and is reserved for future expansion when implementing the ThreadGroup class.
  • target. This is a function that runs in the thread using the run() method, if passed a value of None, nothing is called.
  • name. This is the thread name, by default it takes the value “Thread-X”, where X is a decimal number. The programmer can specify the name manually.
  • args. This is a tuple that stores the arguments passed to the function to be called.
  • kwargs. This is a dictionary, storing the arguments passed to the function.
  • daemon. This is the parameter that sets whether the thread is demonic or not. The default value is None, then the daemonic property is inherited from the current thread. The programmer can set the value of the parameter himself.

Daemons

Daemons are processes that run in the background. In Python there is a more specific meaning for daemon: daemon thread or daemon thread. Unlike normal threads, a daemon thread automatically terminates when the program closes. In other words, the program will not wait for a daemon thread to terminate; when it closes, these threads are destroyed, no matter what state they are in. Demonic threads are used to perform operations performed in an infinite loop. In other cases, simple threads are usually used, which delay closing the program until all operations are completed. Using demonic threads allows operations in the background, which usually do not involve changing and saving long-term data. For example, if a program completely overwrites the contents of a file, and the overwriting mechanism is implemented in a daemon thread, data will be lost when the program unexpectedly exits. Graphical interface drawing functions are often placed in demonic threads. GUI drawing is an endless process which terminates right after the program exit. If you just put it into a normal thread, it will prevent the program from closing.

Methods for working with threads

Various methods of Thread class are used to create and manage threads. With their help, you can easily manipulate several threads at once and define their behavior.

start()

This is used to start a thread that has been created. After using threading.Thread(), a new thread is created, but it is inactive. The start() method is used to start it.

import threading 
def myfunc(a, b) 
    print('sum :',a + b) 
thr1 = threading.Thread(target = myfunc, args = (1, 2))
thr1.start()

Here, until we call the start method, the myfunc function will not run.

join()

This method blocks the execution of the thread that called it until the thread whose method was called is finished. That is, if the thread2: thread2.join() method was called in thread1, thread1 will be suspended until thread2 finishes executing. You can use this method to force the program to wait for the demon thread to finish. For example, if you call the method in the main thread, the program won’t finish until the demon thread executes. The join() method has a timeout argument. It defaults to None, but the programmer can pass a floating-point number to it.

If the argument has a default value, the execution of the thread is paused while the method thread is running.

If you pass a number as an argument, the join() method will set a timeout; when it expires, the thread will continue its work. For example, thr1.join(100) means that it will wait no more than 100 seconds for thread thr1 to complete its execution. Since the join() method always returns None, to check if the thread has had time to fully execute in the specified timeout, we need to check if the thread is_alive(). Let’s look at an example:

import threading
import time
def myfunc(a, b):
    time.sleep(2.5)
    print('sum :', a + b)
thr1 = threading.Thread(target = myfunc, args = (1, 2), daemon=True)
thr1.start()
thr1.join(0.125)
if thr1.is_alive():
    print('the thread failed to complete')
else:
    print('computation completed')

thread failed to terminate

Here we make the thread demonic so that the program won’t have to wait for the function to finish. We plug in the time module to make the delay in the function 2.5 seconds. After the thread starts, we pause the main thread for 0.125 seconds. Then we check for is_alive(). If it prints True, it means the thread hasn’t finished executing in 0.125 seconds.

run()

This method describes the operations performed by the thread. It is used when an instance of a class is explicitly created. Example:

import threading as th
import time
class Thr1(th.Thread): # Create an instance of Thread
    def __init__(self, var):
        th.Thread.__init__(self)
        self.daemon = True # Specify that this thread is a daemon
        self.var = var # this is the interval passed as an argument

    def run(self): # the method that is executed when the thread starts
        num = 1
        while True:
            y = num*num + num / (num - 10) # Compute function
            num += 1
            print("When num =", num, " function y =", y) # Print the result
            time.sleep(self.var) # Wait for the specified number of seconds
x = Thr1(0.9)
x.start()
time.sleep(2)

If num = 2 function y = 0.88888888888888
If num = 3, then the function y = 3.75
When num = 4, the function y = 8.571428571428571

is_alive()

This method checks if a thread is currently running. It is often used in conjunction with the join() method. Besides, it can be used to control cleverly the execution of daemon threads, preventing them from abrupt termination when the program is closed, for example:

while True:
    if thr1.is_alive() == True: # Check if the daemon thread is running
        time.sleep(1) # if true, wait 1 second and check again
    else:
        break # If not, exit loop and close program

Stopping a thread

There are situations when you might want to stop a thread that is running in the background. Suppose we have a thread with an endless loop in its run function. In our main program we need to stop it. The easiest thing to do here is to create a variable called stop:

  • In an infinite loop do a constant check on it and if it is True, end it.
  • Do not use functions that may block execution for a long time. Always use timeout.

Here is an example of such a program:

import threading 
stop = False
def myfunc():
    global stop
    while stop == False:
        pass
thr1 = threading.Thread(target = myfunc) 
thr1.start() 
stop = True
while thr1.is_alive() == True 
    pass
print('thread ended')

Here we use the global variable stop. When we need to stop a thread, we set it to True and just wait for it to finish.

Race condition

Race condition is an error that occurs when you design a multithreaded program incorrectly. It occurs when several threads access the same data. For example, a variable stores a number that thread1 and thread2 are trying to change simultaneously, which leads to unpredictable results or an error. A common situation is when one thread checks the value of a variable for a condition to perform some action, but between checking the condition and performing the action, a second thread intervenes and changes the value of the variable, leading to incorrect results, e.g:

x = 5
# Thread 1:
if x == 5: # Thread 1 checks the condition and thinks it is true
# Thread 2:
x = 1 # Thread two changes the value of the variable
# Thread 1:
print("If x = 5 function 2*x =", 2 * x) # Thread one performs the action

Eventually the program prints, “When x = 5, the function 2*x = 2”. The state of the race can lead to various problems:

  • Memory leakage.
  • Data loss.
  • Program security vulnerabilities.
  • Getting false results.
  • Reciprocal thread locks.

Shared resource access (lock)

To prevent race condition, you need to use threading.Lock() lock, which prevents multiple threads from working with the same data at once. In other words, Lock protects data from simultaneous access. threading.Lock() – returns an object, which, figuratively speaking, is a door to a room that locks if someone is in the room. That is, if a thread has used Lock (entered the room), another thread is forced to wait until the thread that used Lock gives up on it (leaves the room). The resulting object has two methods: acquire() and release(). Let’s consider them.

acquire()

The method allows the thread to acquire a lock. It has two arguments: blocking and timeout. When called with blocking set to True (the default), it locks until it is unlocked and returns True. If the object is already locked, the thread pauses and waits until the object is unlocked and then locks it itself. When called with False arguments, if the Lock object is unlocked, the method locks it and returns True. If Lock is already locked the method does nothing and returns False. The timeout argument (default is -1) can only be changed if the blocking argument is True. If a positive value is passed as the argument, the object is blocked for the specified number of seconds, taking into account the time the object waits to be blocked. The default argument tells the method to use infinite waiting.

release()

This method unlocks the Lock object. The interpreter allows it to be called from any thread, not just the thread that currently locks Lock. The method returns nothing and causes a RuntimeError if called when the Lock object is already unlocked. Here is an example:

import threading
lock = threading.Lock()
x = 'Lock 2'
# ...
lock.acquire()
x = 'Authon 3'
print(x)
lock.release()

Python 3

Here we create a lock object, with which we will safely read and modify data. The data we will lock in this example is a single variable x. The following shows how to change the data in a safe way: we first use acquire to wait our turn to access the data. Then we change it (in our example, we overwrite the value of the variable from “Python 2” to “Python 3”). Next, output the value to the console. After that, release access for other threads. If all threads that need access to data x use lock, you can avoid a “Race State”.

deadlock

A serious problem arises when using lock and it causes the program to stop working altogether. If the acquire() method is called and the lock object is already locked, the thread that called acquire() will wait until the thread that locked the object calls release(). If a single thread invokes the lock method several times in a row, the thread’s execution is paused until it invokes release() itself. However, it cannot call release because its execution is suspended, which means the program is infinitely blocked. Self-locking can be prevented by removing the extra acquire() call, but this is not always possible. Self-locking can occur because of the following things:

  • Errors occurring when Lock is left locked.
  • Incorrect program design when one function is called by another function which doesn’t have a lock.

If errors occur, it is sufficient to use the try-finally construct or the with operator. Here is an example with:

lock = threading.Lock() 
with lock:
    # operators
    pass

The try-finally construct allows you to remove the lock even if an error occurs, thus avoiding a deadblock. Example:

lock = threading.Lock()
lock.acquire()
try:
    # operators
    pass
finally:
    lock.release()

The try-finally construct ensures that the code in finally is always executed, regardless of errors or results of the try block. However, this does not work in the case of self-locking due to improper program design. An RLock object was created for this purpose.

RLock

If Lock is locked, it will block any thread that tries to do the same, even if that thread is the owner of the lock at the time. For example, the programmer wrote code:

import threading
lock1 = threading.Lock()
def part1():
    lock1.acquire()
    try:
        # calculate the sum of the elements of the first part of the object
        pass
    finally:
        lock1.release()
    return sum
def part2():
    lock1.acquire()
    try:
        # calculate sum of elements of the second part of the object
        pass
    finally:
        lock1.release()
    return sum
def both_parts():
    p1 = part1()
    p2 = part2()
    return p1, p2

This code will work, but the problem is that when both_parts functions are called, the part1 and part2 functions are called. Between calls of these functions some other thread can access the data and change them. And what to do if another thread needs to avoid changing them? To solve the problem we need to lock1 and in both_parts, let’s rewrite it

def both_parts():
    lock1.acquire()
    try:
        p1 = part1()
        p2 = part2()
    finally:
        lock1.release()
    return p1, p2

The idea is simple: the external both_parts locks the thread while the part1 and part1 functions are running. Each function also locks the thread to summarize its part of the object. However, the Lock object will not allow this; this code will cause the program to hang completely, because it makes no difference to Lock where in the thread acquire() was called. RLock only locks a thread if the object is locked by another thread. Using RLock, a thread can never lock itself. You need to use RLock to control nested access to shared objects. To solve the Lock problem in the code above, just replace the line “lock1 = threading.Lock()” with “lock1 = threading.RLock()“.

Also, keep in mind that while it is possible to call acquire() multiple times, the release() method must be called the same number of times. Each call to acquire() increases the recursion level by one, and each call to release() decreases it by one.

Transmitting Data Using Queues

To transfer data using queues, the Queue class from the queue library is used, which is imported by the command: “from queue import Queue”. The queue library contains all the necessary tools for transferring data between threads and implements the necessary locking mechanisms. The class Queue implements a FIFO queue, which works like this: the first element that went into the queue is the first to get out of it. This queue can be compared to a vertical hollow pipe, into which elements are thrown from above. The Queue has a maxsize parameter, which accepts only integer values. It indicates the maximum number of elements that can be placed in the queue. When the maximum is reached, the addition of elements to the queue is blocked until there is enough room. If maxsize takes value <= 0, the queue is infinite. The object of the threading module, Event, is used to interact with the queues. It allows a thread to execute desired operations when it receives a signal from another thread. Besides, the thread does not necessarily have to pause its work while waiting for the signal. To transfer data and work with queues, methods are used (they work with all kinds of queues, not just Queue):

qsize()

Returns the approximate size of the queue. It is important to understand two things:

  • If qsize() is greater than zero, the next get() method can still be locked.
  • If qsize() is less than maxsize, the next put() method can be blocked.

This may arise because other threads may access the queue and get/write data right after you get its size.

empty()

The method checks if something is in the queue. If it is empty, it returns True. If it contains elements, it returns False. As with qsize(), True or False does not guarantee that the put() or get() methods will not be blocked.

full()

Checks if the queue is full. If the queue is full, it returns True, otherwise it returns False. As with the previous methods, True or False does not guarantee that put() and get() will not be blocked.

put()

The method puts a new object into the queue, has a mandatory argument item and two optional arguments: block = True and timeout = None.

Queue.put(item, block=True, timeout=None)

Dependingon these arguments, waiting for a place in the queue will behave differently:

  • If the block argument is True and timeout is None, the object to be put into the queue will wait indefinitely for free space.
  • If timeout is greater than zero, the queue will wait no longer than the number of seconds specified, an exception will be thrown if no free space is found.
  • If block is False, timeout is ignored and the item can be queued only if there is some free space, otherwise an exception is thrown.

Here’s an example of creating a queue in Python and adding an item to it:

from queue import Queue
queue1 = Queue()
x = 'some data'
queue1.put(x)

put_nowait()

Equivalent to a call to put(item, False). That is, it puts the item into the queue only if there is room, otherwise it throws an exception.

get()

Deletes and returns an item from the queue. Has two optional arguments: block = True and timeout = None.

Queue.get(block=True, timeout=None)

Dependingon the values of the arguments, waiting for the object behaves differently:

  • If the arguments have a default value, the method waits for the object from the queue until it is available.
  • If timeout is a positive number, the object from the queue waits a certain amount of time, after which an exception is thrown.
  • If block is False, the item is only returned if it is available, otherwise an exception is thrown (the timeout argument is ignored).

Here’s an example. Here we add a line to the queue. Then we fetch it and print it to the console:

from queue import Queue
queue1 = Queue()
queue1.put('Python 3')
value = queue1.get()
print(value)

Python 3

get_nowait()

Equivalent to calling get(False).

task_done()

This method works in conjunction with the join() method. The method indicates that the previously assigned task has been completed. After getting each item in the queue, say with get(), you need to call task_done() to decrease the task counter. The join method is described below with an example If task_done() is called more times than the number of elements placed in the queue, a ValueError exception is thrown.

join()

Blocks the thread until all elements in the queue have been received and processed. Each time a new element is added to the queue, the incomplete task counter is incremented. When task_done() is called, the counter decreases, indicating that processing of the item in the queue is complete and you can move on to the next one. When the counter is zero, the thread is unlocked. Here is an example:

from queue import Queue 
queue1 = Queue() 
queue1.put('Python 2')
queue1.put('Python 3')
print(queue1.get())
queue1.task_done()
print(queue1.get())
queue1.task_done()
queue1.join()

This example is to show how join and task_done work. Everything happens in one thread here. Usually, one thread writes data into the queue and then waits for them to be processed by a join. Meanwhile, the other thread calls task_done on receipt of each new value.

An example of a program

The program’s purpose is simple: the students have to submit their final tests. The work is submitted to two professors and the time it takes to check the work depends on the student’s rating (the higher the rating, the better the work, the less they check it). Program:

from queue import Queue
import time, datetime, threading
students= [(99, "Andrew",)
           (76, "Alexander"),
           (75, "Nikita"),
           (72, "Evgeny"),
           (66, "Alexei"),
           (62, "Sergey"),
           (50, "Mikhail")]
def student(q):
    while True:
        # get the job from the queue
        check = q.get()
        # print the check start time
print(check[1], 'submitted at', datetime.datetime.now()
            .strftime('%H:%M:%S')) 
        #Time taken to check, which depends on the rating
        time.sleep((100-check[0])/5)
        # Check timeout time
print(check[1], 'picked up the job at', datetime.datetime.now()
            .strftime('%H:%M:%S'))
        # Signals that the job in the queue is done
        q.task_done()
# Create a queue
q = Queue()
# load students into the queue
for x in students:
    q.put(x)
# create and start threads
thread1 = threading.Thread(target=student, args=(q,), daemon=True)
thread2 = threading.Thread(target=student, args=(q,), daemon=True)
thread1.start()
time.sleep(10)
thread2.start()
# lock execution until all jobs have finished
q.join()
print("This text will print when the lock is over")

Results:

Andrey handed in his work at 18:58:35
Andrey picked up the work at 18:58:36
Alexander handed in his work at 18:58:36
Alexander took the job at 18:58:40
Nikita handed in his work at 18:58:40
Eugene turned in his work at 18:58:45
Nikita deposited his work at 18:58:45
Alexey handed in his work at 18:58:45
Eugene took his work at 18:58:51
Sergey took his work at 18:58:51
Alexey took his work at 18:58:52
Michael took his work at 18:58:52
Sergey picked up his work at 18:58:59
Michael picked up his work at 18:59:02
This text will be printed after the end of the lockout

As you can see from the results, the students were processed in two threads, by two teachers. q.join() blocked the execution of the main thread, so the text was printed only after all the tasks in the queue were finished.

Useful tools of the threading module

Threading has a few more useful tools that can be useful for more specialized tasks.

Semaphore

This is one of the oldest synchronization primitives in the history of computer science. Semaphore uses an internal counter that decreases with each call to acquire() and increases with each call to release(). The counter cannot become less than zero, when it becomes zero, acquire() locks the thread. Here is an example:

import threading 
x = 'Python'
sem = threading.Semaphore()
sem.acquire()
x = 'Python 2'
sem.release()
with sem:
    x = 'Python 3'

Here are two ways to access the data:

  • With acquire and release.
  • Using with.

Timer

This class allows you to control the timing of an action. Timer is a subclass of Thread. Here are its arguments:

threading.Timer(interval, function, args=None, kwargs=None)

Timers are started like threads, with the start() method. They can be stopped using the cancel() method. With a timer, the programmer can call a function, assign a value to a variable, or, for example, start a thread at a certain time. Example usage:

import threading
def myfunc():
    print('tick-tack')
timer = threading.timer(4, myfunc)
timer.start()

tick-tack

Here myfunc will execute 4 seconds after the start() method is called.

Barrier

This class allows you to implement a simple mechanism for synchronizing threads. It can be used for a fixed number of threads when you want each thread to wait for some action to be performed by all of them. In order to continue execution, all threads must call the wait() method; if at least one thread has not done so, the rest are blocked until the method is called. This is what its arguments look like:

threading.Barrier(parties, action=None, timeout=None)

Let’s look at a sample use case:

import threading
import time
barrier = threading.Barrier(2)
def myfunc():
    barrier.wait()
    print('barrier worked')
thr1 = threading.Thread(target = myfunc).start()
time.sleep(1)
print('main thread')
barrier.wait()

main thread
barrier worked

I set barrier here for 2 calls to wait. So in order to execute code after wait, wait should be called in 2 threads. In this case the myfunc function will be executed in a thread immediately but it will not print 'barrier processed' into the console immediately and wait until wait is called in the main thread as well.

Event

Event is a simple mechanism of communication between threads: one thread gives a signal for an event and the others are waiting for it. The event object controls an internal flag that can be set to True or False using the set() and clear() methods. There are also is_set() methods that can be used to check the state of the internal flag. With the wait(timeout=None) method you can wait until the flag is set to True. You can also set the wait time if needed. Here is an example:

import threading
import time
event = threading.Event()
def myfunc():
    time.sleep(1)
    event.set()
thr1 = threading.Thread(target = myfunc).start()
print(event.is_set())
event.wait()
print(event.is_set())
event.clear()
print(event.is_set())

False
True
False

Conclusion

The threading capability in Python is a powerful tool for developing large programs. It uses the Threading module and the queue library in conjunction with it. Every Python programmer should know how to work with threads, queues and understand how blocking, data access and data transfer between threads works.

Related Posts

LEAVE A COMMENT