luminal.managers.threads

Module Contents

Classes

TracedThread

Custom thread object that can be halted using the halt() function.

ThreadManager

A class for managing multiple threads in a codebase.

class TracedThread(*args, **keywords)

Bases: threading.Thread

Custom thread object that can be halted using the halt() function.

This thread type extends the functionality of the standard threading.Thread class to allow for the thread execution to be halted using the halt() function. Once halted, the thread will immediately stop execution and expose any underlying pipes. This can be useful for debugging or other similar tasks.

Important

This thread type is best utilized for small tasks that can be daemonized or terminated quickly.

Parameters:
__run()

The control function for the TracedThread that sets the global trace function and executes the thread’s main function.

Notes

  • This method is used to run the code in the thread, but with a global trace function set beforehand.

  • The globaltrace attribute, set previously by the user, is set as the current trace function at runtime.

  • The previous __run_backup() method, i.e. the original hardcoded run method, is called here to execute the thread’s main function.

  • The second to last line overwrites the run() method with the original method, effectively undoing any trace function set previously.

Return type:

None

start()

Starts the thread execution by invoking the Thread.start() method, but not before setting the appropriate run method.

Notes

  • This method overrides the run() method and replaces it with the __run() method, which sets the global trace function and executes the thread’s main function.

  • The original run method is saved into the __run_backup attribute before it is swapped out.

  • Finally, the threading.Thread.start() method is called on the thread object itself to actually start the execution of the code.

Return type:

None

globaltrace(frame, event, arg)

The global trace function for a TracedThread that replaces the default function.

Parameters:
  • frame (Any) – The current thread frame.

  • event (Any) – The event type (e.g. ‘call’, ‘line’, ‘return’, ‘exception’).

  • arg (Any) – Event-specific arguments.

Returns:

If the event type is call, self.localtrace is returned, otherwise None is returned.

Return type:

Optional[self.localtrace]

Notes

  • This method serves as the global trace function for the TracedThread class.

  • If the event type is call, the trace function switches to self.localtrace which handles the local tracing inside the thread in a context-specific manner.

  • If another event type is detected, None is returned as there’s no further action required.

localtrace(frame, event, arg)

Trace function for the TracedThread class.

Parameters:
  • frame (types.FrameType) – The current frame being executed in the thread.

  • event (str) – The event that occurred in the thread during tracing.

  • arg (Any) – Optional argument associated with the event.

Returns:

The next trace function to be called or None to stop tracing.

Return type:

Optional[Callable]

Raises:

SystemExit: – If the thread is halted and the event is line, the program is forcefully terminated.

Notes

  • This function can be registered as a trace function on a thread using sys.settrace(). It is called every time an event is encountered in the traced thread and returns the next trace function to be called. If the thread is halted and the event is line, it raises a SystemExit exception to stop the thread execution.

halt()

Halt function for the TracedThread class.

Notes

  • This function sets the halted attribute of the TracedThread instance to True, indicating that the thread should be paused for debugging or other purposes. Once halted, the thread will stop executing at the next available opportunity in its trace function.

  • This function is intended to be used in combination with the localtrace() and globaltrace() methods from the sys module, where the former is registered as a trace function for a thread to halt the thread at a specific point, and the latter is used to globally manipulate the trace function of all threads in the running Python process.

Return type:

None

class ThreadManager

A class for managing multiple threads in a codebase.

This class provides functionality for managing and controlling the execution of multiple threads in a program. It maintains a list of running and requested threads, a flag to stop all threads gracefully, and settings for running threads based on given limits.

_generate_uid(delimiter='-')

Generates a unique identifier for the instance of the ThreadManager class.

Parameters:

delimiter (str) – The delimiter to use for concatenating the string chunks. Defaults to the - character.

Returns:

A unique identifier for the ThreadManager instance.

Return type:

str

Notes

  • This function generates a unique identifier for the ThreadManager instance, based on a randomly generated alphanumeric string. However, it splits the string into six equal chunks and concatenates them with the delimiter to create the final UID.

  • This function is primarily intended for internal use by the ThreadManager class to generate a unique identifier for each instance. It can be called by other methods within the class to ensure that the identifier is unique and consistent for each instance.

  • Note that the delimiter argument can be customized to fit the specific needs of the program or application, but it is recommended that a standard delimiter such as - or _ be used to ensure portability and compatibility with other systems.

_clean_threads()

Cleans up the list of running threads in the ThreadManager instance.

This function removes completed threads from the _running_threads collection of a ThreadManager instance. It does so by creating a copy of _running_threads, looping through it to find completed threads and clears them from the original dictionary.

Notes

  • This function is intended for internal use by the ThreadManager class to clean up the list of running threads after they have finished executing. It should be called regularly to prevent the running thread list from growing too large and potentially causing memory issues.

  • The function works by creating a shallow copy of the running threads dictionary to ensure that the original dictionary is not modified during iteration, which can cause unexpected behavior. If a thread is found to have finished executing, it is removed from the running threads dictionary to keep the list up to date.

  • Note that this function does not stop or halt any individual threads; it simply removes finished threads from the list of running threads. Any necessary cleanup or stopping of threads should be handled by other methods or functions within the ThreadManager class.

Return type:

None

_stop_threads()

Stops all running threads in the ThreadManager instance.

This function iterates over the list of running threads in the ThreadManager instance and stops each thread using the halt() function of the TracedThread class. It then waits for each stopped thread to complete using the join() function before cleaning up the list of running threads.

Important

Invoking the halt() function on a TracedThread instance may cause the thread to stop executing immediately, so it should be used with caution. Additionally, any necessary cleanup or stopping of threads should be handled by other methods or functions within the ThreadManager class.

Notes

  • This function is intended for internal use by the ThreadManager class to stop all running threads in a program abrubtly. It should be called when the user requests to stop all threads, or when the program is about to exit.

  • The function works by creating a shallow copy of the running threads dictionary to ensure that the original dictionary is not modified during iteration, which can cause unexpected behavior. For each running thread, the function checks if it is still alive, and if so, halts the thread using the halt() function of the TracedThread class. It then waits for the thread to complete halting using the join() function before cleaning up the list of running threads.

Return type:

None

_start_threads(watching_threads=False)

Starts threads requested in the queue for the ThreadManager object.

Parameters:

watching_threads (Optional[bool]) – When True, the method will check every so often to see if any new threads have been appended to the queue. Defaults to False.

Raises:
  • NoThreadsFoundError: – Raised when there are no threads ready to run in the queue.

  • ThreadLimitReachedError: – Raised when the maximum number of threads allowed to run in the calling ThreadManager instance has been reached.

Return type:

None

Notes

  • The _requested_threads collection is used to keep track of which threads in the queue need to be executed. The method checks to see if any of the requested threads can be run when the thread limit of the ThreadManager is not exceeded. Thread limit is determined by the thread_limit of the calling ThreadManager instance.

  • If there are no threads in the queue, a NoThreadsFoundError is raised. If the maximum number of threads allowed to run in the calling instance has been reached, a ThreadLimitReachedError is raised. Once a thread has been started, it is added to the _running_threads attribute so it can be tracked.

  • If a thread has already been registered as running but is still in the queue, it is removed so that it doesn’t get executed twice.

append_thread(function, args=(), kwargs={})

Appends a thread to the request queue for the ThreadManager object.

Parameters:
  • function (object) – The callable function to be executed.

  • args (Optional[tuple]) – A tuple of positional arguments to be passed into the function when executed. Defaults to ().

  • kwargs (Optional(dict[str, Any]]) – A dictionary of keyword arguments to be passed into the function when executed. Defaults to {}.

Returns:

True if the thread was successfully appended to the queue, False otherwise.

Return type:

bool

Notes

  • The _requested_threads collection is used to keep track of which threads can be executed when thread execution is started in the ThreadManager instance. Each thread is represented as a tuple that contains function, args and kwargs attributes, respectively. The function is the callable object that will be executed when the thread is run, while the args and kwargs are arguments that will be passed into the function once executed.

  • The method returns True if the thread was added to the queue successfully and False otherwise. An exception catch is implemented in case there is an error in creating the thread. If the _generate_uid() method raises an exception, False is returned as the thread could not be added to the queue.

stop()

Sets the _flag_request attribute to True, a signal that stops all running threads in the ThreadManager object.

Notes

The _flag_request attribute is a boolean flag that is used to indicate if there is a stop request from the ThreadManager object. When _flag_request is set to True, all running threads are stopped. This method sets the value of this attribute to True and does not return any value. Once this is done, the stop_listening() method is called by the ThreadManager object so that all the threads that are currently running can terminate.

Return type:

None

halt()

Halts all running threads in the ThreadManager object.

Notes

This method is used to stop (halt) all running threads in the ThreadManager object instance. The method achieves this by calling the _stop_threads() method of the ThreadManager object which terminates all running threads. After this method is called, the _running_threads collection of the ThreadManager is set to an empty dictionary. The method does not return anything.

Return type:

None

run()

Starts all threads in the queue for the ThreadManager instance.

Raises:

ThreadsAlreadyRunningError – Raised when new threads have already been started and are currently running.

Return type:

None

Notes

This method is used to start all threads that are in the _requested_threads collection of the ThreadManager object instance. The method calls the _start_threads() function of the ThreadManager object, and attempts to start all pending threads in the queue. The function raises a ThreadsAlreadyRunningError if new threads have already been started and are currently running to avoid starting new threads on top of already running threads.

watch()

Starts a background thread that continuously starts all threads in the request queue for the ThreadManager instance.

Raises:

ThreadManagerAlreadyRunningError – Raised when a manager has already been spawned and is currently watching for threads to start.

Return type:

None

Notes

This method starts a parent background thread that continuously tries to start child threads that are in the _requested_threads collection of the ThreadManager object. The method raises a ThreadManagerAlreadyRunningError if a manager has already been spawned and is currently watching for threads to start. It avoids starting new threads on top of already running threads too.

  • The _watch() function tries to start any pending threads by calling _start_threads(watching_threads=True)().

  • If the thread limit is reached, a ThreadLimitReachedError exception is caught and then the _clean_threads() method is called to cleanup/shutdown any inactive threads. If any other exception occurs, it will be raised except if the exception message starts with "Thread limit". In this case, _clean_threads() is called to cleanup inactive threads.

In conclusion, the method adds the new thread that it starts to the _running_threads attribute of the ThreadManager object, as well as to the _requested_threads collection. The method also sets the _currently_watching flag to True. This will prevent subsequent calls to this method if _currently_watching is already True.