Algorithms

Topological Sort Algorithm

7 months, 1 week ago ; 106 views
Share this

Topological Sort Algorithm

A topological sort algorithm is a method for ordering nodes in a directed acyclic graph (DAG).

It does so in a manner that each directed edge whose source is node A, and destination is node B has node A appearing before node B in the ordering. In simpler terms, it arranges the nodes in a graph in a linear order such that all dependencies are satisfied.

Here's a general overview of how a topological sort algorithm works:

Identify a DAG

Ensure that the graph is a directed acyclic graph (DAG) because topological sorting cannot be performed on graphs with cycles.

Start with nodes that have no incoming edges

Begin by selecting nodes that have no incoming edges (i.e., nodes with no dependencies). These nodes are candidates to appear first in the topological ordering.

Remove nodes with no incoming edges

Remove the selected nodes from the graph along with their outgoing edges. This action may create new nodes with no incoming edges, so continue this process iteratively.

Repeat until all nodes are processed

Continue selecting and removing nodes with no incoming edges (have no dependencies) until all nodes in the graph are processed. At each step, add the selected nodes to the final topological ordering.

Handle cycles (optional)

If the graph contains cycles, topological sorting is not possible. In such cases, the algorithm may detect cycles and report an error or handle them using appropriate strategies.

Output the topological ordering

The final result of the topological sort algorithm is a linear ordering of the nodes, where each node appears before all of its successors in the graph.

Topological sorting has various applications, such as task scheduling, dependency resolution, and determining the order of execution in workflows or pipelines.

Problem Case

You are given a list of jobs or tasks represented by integers. Think of these jobs as tasks that need to be done. For example, when working in a restaurant, these could include things like washing dishes, cooking food, serving food, or paying staff.

We are also given a list of dependencies, which are pairs of jobs with a special link between them. The first job in a pair is a prerequisite to the second job, i.e., It has to be completed before the second job can be completed. The second job is dependent on the first job. The task is to find the correct order(topological ordering or sorts) so that we can complete these jobs while respecting the dependencies provided.

Solution: Algorithm Approach 1

# O(j + d) time | O(j + d) space
def topogologicalSort1(jobs, deps):
    """
    Performs topological sorting to find the order in which jobs should be executed.

    Args:
        jobs (List[str]): A list of job names.
        deps (List[Tuple[str, str]]): A list of tuples representing dependencies between jobs.

    Returns:
        List[str]: A list of job names in the order they should be executed.
    """
    jobGraph = createJobGraph(jobs, deps)
    return getOrderedJobs(jobGraph)


def createJobGraph(jobs, deps):
    """
    Creates a job graph with the given jobs and dependencies.

    Args:
        jobs (List[str]): A list of job names.
        deps (List[Tuple[str, str]]): A list of tuples representing dependencies between jobs.

    Returns:
        JobGraph: The job graph representing the dependencies between jobs.
    """
    graph = JobGraph(jobs)
    for job, dep in deps:
        graph.addDep(job, dep)
    return graph


def getOrderedJobs(graph):
    """
    Performs topological sorting to find the order in which jobs should be executed.

    Args:
        graph (JobGraph): The job graph representing the dependencies between jobs.

    Returns:
        List[str]: A list of job names in the order they should be executed.
    """
    orderedJobs = []
    nodesWithNoPrereqs = list(filter(lambda node: node.numOfPrereqs == 0, graph.nodes))
    while len(nodesWithNoPrereqs):
        node = nodesWithNoPrereqs.pop()
        orderedJobs.append(node.job)
        removeDeps(node, nodesWithNoPrereqs)
    graphHasEdges = any(node.numOfPrereqs for node in graph.nodes)
    return [] if graphHasEdges else orderedJobs


def removeDeps(node, nodesWithNoPrereqs):
    """
    Removes dependencies of a node and updates the list of nodes with no prerequisites.

    Args:
        node (JobNode): The node whose dependencies should be removed.
        nodesWithNoPrereqs (List[JobNode]): The list of nodes with no prerequisites.
    """
    while len(node.deps):
        dep = node.deps.pop()
        dep.numOfPrereqs -= 1
        if dep.numOfPrereqs == 0:
            nodesWithNoPrereqs.append(dep)





class JobGraph:
    """
    Represents a graph of jobs and their dependencies.

    Attributes:
        nodes (List[JobNode]): A list of all nodes (jobs) in the graph.
        graph (Dict[str, JobNode]): A dictionary mapping job names to JobNode objects.
    """
    def __init__(self, jobs):
        """
        Initializes the JobGraph with the given list of job names.

        Args:
            jobs (List[str]): A list of job names.
        """
        self.nodes = []
        self.graph = {}
        for job in jobs:
            self.addNode(job)

    def addDep(self, job, dep):
        """
        Adds a dependency between two jobs.

        Args:
            job (str): The name of the dependent job.
            dep (str): The name of the prerequisite job.
        """
        jobNode = self.getNode(job)
        depNode = self.getNode(dep)
        jobNode.deps.append(depNode)
        depNode.numOfPrereqs += 1

    def addNode(self, job):
        """
        Adds a new job node to the graph.

        Args:
            job (str): The name of the job to add.
        """
        self.graph[job] = JobNode(job)
        self.nodes.append(self.graph[job])

    def getNode(self, job):
        """
        Retrieves the node (JobNode object) corresponding to the given job name.
        If the job does not exist in the graph, it creates a new node.

        Args:
            job (str): The name of the job.

        Returns:
            JobNode: The node corresponding to the given job.
        """
        if job not in self.graph:
            self.addNode(job)
        return self.graph[job]


class JobNode:
    """
    Represents a node in the job graph.

    Attributes:
        job (str): The name of the job.
        deps (List[JobNode]): A list of dependency nodes.
        numOfPrereqs (int): The number of prerequisites for the job.
    """
    def __init__(self, job):
        """
        Initializes a JobNode with the given job name.

        Args:
            job (str): The name of the job.
        """
        self.job = job
        self.prereqs = []  # Not used in this implementation
        self.numOfPrereqs = 0
        self.deps = []

 

Time & Space Complexity

Let n be the number of jobs and m be the number of dependencies.

Time Complexity

Constructing the job graph: O(n)
Adding nodes and dependencies: O(n)
Processing dependencies and performing topological sorting:

  •  Traversing each node: O(n)
  •  Traversing each dependency: O(m)
  •  Overall time complexity for processing dependencies: O(n + m)

The overall time complexity is dominated by the process of traversing nodes and dependencies, resulting in a time complexity of O(n + m).

Space Complexity

Job graph data structure:

  • Storing the nodes and their dependencies: O(n)

Temporary storage during sorting:

  •  Storing the list of nodes with no prerequisites: O(n)
  •  Storing the list of dependencies: O(m)

The overall space complexity is dominated by the job graph data structure and temporary storage during sorting, resulting in a space complexity of O(n + m).

In summary, the time complexity of the provided solution is O(n + m), and the space complexity is also O(n + m), where n is the number of jobs and m is the number of dependencies.

 

Solution: Algorithm Approach 2

The aim of this solution is to solve the topological sorting problem, which involves determining the order in which jobs should be executed based on their dependencies. Here's the thought process behind the solution:

Graph Representation

The problem can be modeled as a directed graph, where each job is a node and directed edges represent dependencies between jobs.

Job Graph Construction

The first step is to construct a graph representing the jobs and their dependencies. It is achieved by iterating through the list of jobs and their dependencies and adding nodes and edges to the graph accordingly.

Topological Sorting

Once the graph is constructed, a topological sort algorithm is applied to determine the order in which jobs should be executed. It involves performing a depth-first traversal of the graph, visiting each node and its dependencies.

Cycle Detection

During the traversal, the algorithm checks for cycles in the graph. If a cycle is detected, it indicates that the jobs have cyclic dependencies, making it impossible to determine a valid execution order. In such cases, an empty list is returned.

Ordered Jobs

As the traversal progresses, jobs are added to an ordered list based on their execution order. Once the traversal is complete, the ordered list of jobs is returned as the result.

Node States

To facilitate cycle detection and traversal, each node in the graph maintains two flags: visited and visiting. These flags help track the traversal state of each node and detect cycles during traversal.

Overall, the solution effectively constructs a job graph, performs topological sorting to determine the execution order of jobs, and handles cycle detection to ensure the validity of the result.

# O(j + d) time | O(j + d) space -Solution 1
def topogologicalSort1(jobs, deps):
    """
    Performs topological sorting to determine the order of jobs' execution.

    Args:
        jobs (List[str]): List of job names.
        deps (List[Tuple[str, str]]): List of tuples representing job dependencies.

    Returns:
        List[str]: Ordered list of job names in the order they should be executed.
                  An empty list is returned if a cycle is detected, indicating impossibility of sorting.

    Time Complexity: O(V + E), where V is the number of jobs and E is the number of dependencies.
    Space Complexity: O(V + E) for storing the job graph and temporary data during traversal.
    """
    jobGraph = createJobGraph(jobs, deps)
    return getOrderedJobs(jobGraph)


def createJobGraph(jobs, deps):
    """
    Creates a job graph with the given jobs and their dependencies.

    Args:
        jobs (List[str]): List of job names.
        deps (List[Tuple[str, str]]): List of tuples representing job dependencies.

    Returns:
        JobGraph: The constructed job graph containing job nodes and their dependencies.
    """
    graph = JobGraph(jobs)
    for prereq, job in deps:
        graph.addPrereq(job, prereq)
    return graph


def getOrderedJobs(graph):
    """
    Performs depth-first traversal to determine the order of jobs' execution.

    Args:
        graph (JobGraph): Job graph containing job nodes and their dependencies.

    Returns:
        List[str]: Ordered list of job names in the order they should be executed.
                  An empty list is returned if a cycle is detected, indicating impossibility of sorting.
    """
    orderedJobs = []
    nodes = graph.nodes
    while len(nodes):
        node = nodes.pop()
        containsCycle = depthFirstTraverse(node, orderedJobs)
        if containsCycle:
            return []
    return orderedJobs


def depthFirstTraverse(node, orderedJobs):
    """
    Performs depth-first traversal to detect cycles and determine the order of jobs' execution.

    Args:
        node (JobNode): Current job node being visited.
        orderedJobs (List[str]): List to store the ordered job names.

    Returns:
        bool: True if a cycle is detected, False otherwise.
    """
    if node.visited:
        return False
    if node.visiting:
        return True
    node.visiting = True
    for prereqNode in node.prereqs:
        containsCycle = depthFirstTraverse(prereqNode, orderedJobs)
        if containsCycle:
            return True
    node.visited = True
    node.visiting = False  # Reset visiting flag (not mandatory)
    orderedJobs.append(node.job)
    return False


class JobGraph:
    """
    Represents a graph of jobs and their dependencies.
    """
    def __init__(self, jobs):
        """
        Initializes the job graph with the given list of job names.

        Args:
            jobs (List[str]): List of job names.
        """
        self.nodes = []
        self.graph = {}
        for job in jobs:
            self.addNode(job)

    def addPrereq(self, job, prereq):
        """
        Adds a prerequisite dependency between two jobs.

        Args:
            job (str): The name of the dependent job.
            prereq (str): The name of the prerequisite job.
        """
        jobNode = self.getNode(job)
        prereqNode = self.getNode(prereq)
        jobNode.prereqs.append(prereqNode)

    def addNode(self, job):
        """
        Adds a new job node to the graph.

        Args:
            job (str): The name of the job to add.
        """
        self.graph[job] = JobNode(job)
        self.nodes.append(self.graph[job])

    def getNode(self, job):
        """
        Retrieves the node corresponding to the given job name.
        Creates a new node if the job does not exist in the graph.

        Args:
            job (str): The name of the job.

        Returns:
            JobNode: The node corresponding to the given job.
        """
        if job not in self.graph:
            self.addNode(job)
        return self.graph[job]


class JobNode:
    """
    Represents a node (job) in the job graph.
    """
    def __init__(self, job):
        """
        Initializes a job node with the given job name.

        Args:
            job (str): The name of the job.
        """
        self.job = job
        self.prereqs = []  # List of prerequisite job nodes
        self.visited = False  # Flag to track visited status during traversal
        self.visiting = False  # Flag to track currently visiting status during traversal

Time & Space Complexity

Here, I will analyze the time and space complexity of the provided solution as follows:


Time Complexity

The time complexity of creating the job graph (createJobGraph method) is O(V + E), where V is the number of vertices (jobs), and E is the number of edges (dependencies). In the worst case, we iterate through all jobs and their dependencies to create the graph.

The time complexity of the depth-first traversal (depthFirstTraverse method) is also O(V + E), as we visit each node and its dependencies once. In the worst case, this traversal may include all vertices and edges in the graph.

Therefore, the overall time complexity of the topological sorting algorithm is O(V + E).

Space Complexity

The space complexity of the job graph (instances of JobGraph and JobNode classes) is O(V), where V is the number of vertices (jobs). It is because we store each job node in the graph, along with its prerequisites.

The space complexity of the ordered jobs list (orderedJobs variable) is also O(V), as it may contain all jobs in the graph in the worst case.

Additionally, we use auxiliary space for recursive calls and boolean flags (visited and visiting) in the depth-first traversal, which contributes to a space complexity of O(V).

Therefore, the overall space complexity of the solution is O(V), with V representing the number of vertices (jobs) in the graph.

 

Unit Test

import unittest

class TestTopologicalSort(unittest.TestCase):

    def test_topological_sort(self):
        # Test case with a simple dependency graph
        jobs = [1, 2, 3, 4]
        deps = [(1, 2), (1, 3), (3, 2), (4, 2), (4, 3)]
        expected_results = [[1, 4, 3, 2], [4, 1, 3, 2]]  # List of valid orderings
        actual_result = topogologicalSort1(jobs, deps)
        self.assertIn(actual_result, expected_results)


    def test_topological_sort_empty_deps(self):
        # Test case with no dependencies
        jobs = [1, 2, 3]
        deps = []
        expected_result = {1, 2, 3}  # Use set instead of list
        actual_result = set(topogologicalSort1(jobs, deps))  # Convert the actual result to a set
        self.assertSetEqual(actual_result, expected_result)


    def test_topological_sort_circular_dependency(self):
        # Test case with circular dependency
        jobs = [1, 2, 3]
        deps = [(1, 2), (2, 3), (3, 1)]
        expected_result = []
        self.assertEqual(topogologicalSort1(jobs, deps), expected_result)

    def test_topological_sort_duplicate_deps(self):
        # Test case with duplicate dependencies
        jobs = [1, 2, 3]
        deps = [(1, 2), (1, 2), (2, 3)]
        expected_result = [1, 2, 3]
        self.assertEqual(topogologicalSort1(jobs, deps), expected_result)

if __name__ == "__main__":
    unittest.main()

 

 

 

 

Become a member
Get the latest news right in your inbox. We never spam!

Read next

Island Perimeter

 This solution seeks to find the perimeter of an island in a grid.  The problem considers a grid where each cell represents land (1) or … Read More

Kibsoft 3 months, 3 weeks ago . 76 views

Pacific Atlantic Waterflow

3 months, 3 weeks ago . 82 views