Skip to content

Reference

Planners

A*

seally.planners.a_star

AStar

A* Algorithm for finding the shortest path between points in a enviroment.

Source code in src/seally/planners/a_star.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class AStar():
    """
    A* Algorithm for finding the shortest path between points in a enviroment.
    """
    def __init__(self, env: Enviroment, heuristic: Callable[[Position, Position], float]):
        """
        Initialize an A* Object.

        Args:
            env: An Enviroment to Search.
            heuristic: The "Cost to Go" heuristic. 
        """
        self.env = env
        self.heuristic = heuristic

    def compute_path(self, source: Position, goal: Position) -> Path:
        """
        Computes the shortest path from the source position to the goal position using the A* Algorithm.

        Args:
            source: The source position in the enviroment.
            goal: The goal position in the enviroment.

        Returns:
            The shortest path from source to goal.
        """

        if not self.env.in_bounds(source) or self.env.is_occupied(source):
            raise Exception("Source is not a valid position")

        if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
            raise Exception("Goal is not a valid position")

        tie_count = 0
        open_set: List[Tuple[float, int, Position]] = []
        heapq.heappush(open_set, (0.0, tie_count, source))

        came_from: Dict[Position, Optional[Position]] = {source: None}
        cost_so_far: Dict[Position, float] = {source: 0.0}
        closed_set = set()  # track fully explored cells

        while open_set:
            _, _, current = heapq.heappop(open_set)

            if current in closed_set:  # already explored optimally, skip
                continue

            closed_set.add(current)

            if current == goal:
                break

            for next in self.env.get_neighbors(current):
                if self.env.is_occupied(next) or next in closed_set:
                    continue

                new_cost = cost_so_far[current] + self.env.get_cost(current, next)
                if next not in cost_so_far or new_cost < cost_so_far[next]:
                    cost_so_far[next] = new_cost
                    priority = new_cost + self.heuristic(next, goal)
                    tie_count += 1

                    heapq.heappush(open_set, (priority, tie_count, next))
                    came_from[next] = current

        if current != goal:
            raise Exception("Path not found")

        path: List[Position] = []
        trace: Optional[Position] = goal
        while trace is not None:
            path.append(trace)
            trace = came_from.get(trace)
        path.reverse()
        return path

compute_path(source, goal)

Computes the shortest path from the source position to the goal position using the A* Algorithm.

Parameters:

Name Type Description Default
source Position

The source position in the enviroment.

required
goal Position

The goal position in the enviroment.

required

Returns:

Type Description
Path

The shortest path from source to goal.

Source code in src/seally/planners/a_star.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def compute_path(self, source: Position, goal: Position) -> Path:
    """
    Computes the shortest path from the source position to the goal position using the A* Algorithm.

    Args:
        source: The source position in the enviroment.
        goal: The goal position in the enviroment.

    Returns:
        The shortest path from source to goal.
    """

    if not self.env.in_bounds(source) or self.env.is_occupied(source):
        raise Exception("Source is not a valid position")

    if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
        raise Exception("Goal is not a valid position")

    tie_count = 0
    open_set: List[Tuple[float, int, Position]] = []
    heapq.heappush(open_set, (0.0, tie_count, source))

    came_from: Dict[Position, Optional[Position]] = {source: None}
    cost_so_far: Dict[Position, float] = {source: 0.0}
    closed_set = set()  # track fully explored cells

    while open_set:
        _, _, current = heapq.heappop(open_set)

        if current in closed_set:  # already explored optimally, skip
            continue

        closed_set.add(current)

        if current == goal:
            break

        for next in self.env.get_neighbors(current):
            if self.env.is_occupied(next) or next in closed_set:
                continue

            new_cost = cost_so_far[current] + self.env.get_cost(current, next)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost + self.heuristic(next, goal)
                tie_count += 1

                heapq.heappush(open_set, (priority, tie_count, next))
                came_from[next] = current

    if current != goal:
        raise Exception("Path not found")

    path: List[Position] = []
    trace: Optional[Position] = goal
    while trace is not None:
        path.append(trace)
        trace = came_from.get(trace)
    path.reverse()
    return path

seally.planners.bfs

BFS

Breadth First Search algorithm for finding paths between points in a enviroment.

Source code in src/seally/planners/bfs.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class BFS():
    """
    Breadth First Search algorithm for finding paths between points in a enviroment.
    """
    def __init__(self, env: Enviroment):
        """
        Initialize a BFS Object.

        Args:
            env: An Enviroment to search.
        """
        self.env = env

    def compute_path(self, source: Position, goal: Position) -> Path:
        """
        Computes a path from the source position to the goal position using the Breadth First Search Algorithm.

        Args:
            source: The source position in the enviroment.
            goal: The goal position in the enviroment.

        Returns:
            A path from source to goal.
        """

        if not self.env.in_bounds(source) or self.env.is_occupied(source):
            raise Exception("Source is not a valid position")

        if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
            raise Exception("Goal is not a valid position")

        open_set = deque([source])
        visited = {source}
        came_from: dict = {source: None}

        while open_set:
            current = open_set.popleft()
            if current == goal:
                break

            for next_cell in self.env.get_neighbors(current):
                if self.env.is_occupied(next_cell) or next_cell in visited:
                    continue

                visited.add(next_cell)
                came_from[next_cell] = current
                open_set.append(next_cell)

        if current != goal:
            raise Exception("Path not found")

        path: List[Position] = []
        trace: Optional[Position] = goal
        while trace is not None:
            path.append(trace)
            trace = came_from.get(trace)
        path.reverse()
        return path

compute_path(source, goal)

Computes a path from the source position to the goal position using the Breadth First Search Algorithm.

Parameters:

Name Type Description Default
source Position

The source position in the enviroment.

required
goal Position

The goal position in the enviroment.

required

Returns:

Type Description
Path

A path from source to goal.

Source code in src/seally/planners/bfs.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def compute_path(self, source: Position, goal: Position) -> Path:
    """
    Computes a path from the source position to the goal position using the Breadth First Search Algorithm.

    Args:
        source: The source position in the enviroment.
        goal: The goal position in the enviroment.

    Returns:
        A path from source to goal.
    """

    if not self.env.in_bounds(source) or self.env.is_occupied(source):
        raise Exception("Source is not a valid position")

    if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
        raise Exception("Goal is not a valid position")

    open_set = deque([source])
    visited = {source}
    came_from: dict = {source: None}

    while open_set:
        current = open_set.popleft()
        if current == goal:
            break

        for next_cell in self.env.get_neighbors(current):
            if self.env.is_occupied(next_cell) or next_cell in visited:
                continue

            visited.add(next_cell)
            came_from[next_cell] = current
            open_set.append(next_cell)

    if current != goal:
        raise Exception("Path not found")

    path: List[Position] = []
    trace: Optional[Position] = goal
    while trace is not None:
        path.append(trace)
        trace = came_from.get(trace)
    path.reverse()
    return path

seally.planners.dfs

DFS

Depth First Search algorithm for finding paths between points in a enviroment.

Source code in src/seally/planners/dfs.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class DFS():
    """
    Depth First Search algorithm for finding paths between points in a enviroment.
    """
    def __init__(self, env: Enviroment):
        """
        Initialize a DFS Object.

        Args:
            env: An Enviroment to search.
        """
        self.env = env

    def compute_path(self, source: Position, goal: Position) -> Path:
        """
        Computes a path from the source position to the goal position using the Depth First Search Algorithm.

        Args:
            source: The source position in the enviroment.
            goal: The goal position in the enviroment.

        Returns:
            A path from source to goal.
        """

        if not self.env.in_bounds(source) or self.env.is_occupied(source):
            raise Exception("Source is not a valid position")

        if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
            raise Exception("Goal is not a valid position")

        open_set = [source]
        visited = {source}
        came_from: Dict[Position, Optional[Position]] = {source: None}

        current = source
        while open_set:
            current = open_set.pop()

            if current == goal:
                break

            for next_cell in self.env.get_neighbors(current):
                if self.env.is_occupied(next_cell) or next_cell in visited:
                    continue
                visited.add(next_cell)
                came_from[next_cell] = current
                open_set.append(next_cell)

        if current != goal:
            raise Exception("Path not found")

        path: List[Position] = []
        trace: Optional[Position] = goal
        while trace is not None:
            path.append(trace)
            trace = came_from.get(trace)
        path.reverse()
        return path

compute_path(source, goal)

Computes a path from the source position to the goal position using the Depth First Search Algorithm.

Parameters:

Name Type Description Default
source Position

The source position in the enviroment.

required
goal Position

The goal position in the enviroment.

required

Returns:

Type Description
Path

A path from source to goal.

Source code in src/seally/planners/dfs.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def compute_path(self, source: Position, goal: Position) -> Path:
    """
    Computes a path from the source position to the goal position using the Depth First Search Algorithm.

    Args:
        source: The source position in the enviroment.
        goal: The goal position in the enviroment.

    Returns:
        A path from source to goal.
    """

    if not self.env.in_bounds(source) or self.env.is_occupied(source):
        raise Exception("Source is not a valid position")

    if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
        raise Exception("Goal is not a valid position")

    open_set = [source]
    visited = {source}
    came_from: Dict[Position, Optional[Position]] = {source: None}

    current = source
    while open_set:
        current = open_set.pop()

        if current == goal:
            break

        for next_cell in self.env.get_neighbors(current):
            if self.env.is_occupied(next_cell) or next_cell in visited:
                continue
            visited.add(next_cell)
            came_from[next_cell] = current
            open_set.append(next_cell)

    if current != goal:
        raise Exception("Path not found")

    path: List[Position] = []
    trace: Optional[Position] = goal
    while trace is not None:
        path.append(trace)
        trace = came_from.get(trace)
    path.reverse()
    return path

Dijkstra's Algorithm

seally.planners.dijkstras

Dijkstras

Dijkstra's Algorithm for finding the shortest path between points in a enviroment.

Source code in src/seally/planners/dijkstras.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class Dijkstras():
    """
    Dijkstra's Algorithm for finding the shortest path between points in a enviroment.
    """
    def __init__(self, env: Enviroment):
        """
        Initialize a Dijkstra's Algorithm Object.

        Args:
            env: An Enviroment to Search.
        """
        self.env = env

    def compute_path(self, source: Position, goal: Position) -> Path:
        """
        Computes the shortest path from the source position to the goal position using Dijkstra's Algorithm.

        Args:
            source: The source position in the enviroment.
            goal: The goal position in the enviroment.

        Returns:
            The shortest path from source to goal.
        """

        if not self.env.in_bounds(source) or self.env.is_occupied(source):
            raise Exception("Source is not a valid position")

        if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
            raise Exception("Goal is not a valid position")

        tie_count = 0
        open_set: List[Tuple[float, int, Position]] = []
        heapq.heappush(open_set, (0.0, tie_count, source))

        came_from: Dict[Position, Optional[Position]] = {source: None}
        cost_so_far: Dict[Position, float] = {source: 0.0}
        closed_set = set()  # track fully explored cells

        while open_set:
            _, _, current = heapq.heappop(open_set)

            if current in closed_set:  # already explored optimally, skip
                continue

            closed_set.add(current)

            if current == goal:
                break

            for next in self.env.get_neighbors(current):
                if self.env.is_occupied(next) or next in closed_set:
                    continue

                new_cost = cost_so_far[current] + self.env.get_cost(current, next)
                if next not in cost_so_far or new_cost < cost_so_far[next]:
                    cost_so_far[next] = new_cost
                    priority = new_cost
                    tie_count += 1

                    heapq.heappush(open_set, (priority, tie_count, next))
                    came_from[next] = current

        if current != goal:
            raise Exception("Path not found")

        path: List[Position] = []
        trace: Optional[Position] = goal
        while trace is not None:
            path.append(trace)
            trace = came_from.get(trace)
        path.reverse()
        return path

compute_path(source, goal)

Computes the shortest path from the source position to the goal position using Dijkstra's Algorithm.

Parameters:

Name Type Description Default
source Position

The source position in the enviroment.

required
goal Position

The goal position in the enviroment.

required

Returns:

Type Description
Path

The shortest path from source to goal.

Source code in src/seally/planners/dijkstras.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def compute_path(self, source: Position, goal: Position) -> Path:
    """
    Computes the shortest path from the source position to the goal position using Dijkstra's Algorithm.

    Args:
        source: The source position in the enviroment.
        goal: The goal position in the enviroment.

    Returns:
        The shortest path from source to goal.
    """

    if not self.env.in_bounds(source) or self.env.is_occupied(source):
        raise Exception("Source is not a valid position")

    if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
        raise Exception("Goal is not a valid position")

    tie_count = 0
    open_set: List[Tuple[float, int, Position]] = []
    heapq.heappush(open_set, (0.0, tie_count, source))

    came_from: Dict[Position, Optional[Position]] = {source: None}
    cost_so_far: Dict[Position, float] = {source: 0.0}
    closed_set = set()  # track fully explored cells

    while open_set:
        _, _, current = heapq.heappop(open_set)

        if current in closed_set:  # already explored optimally, skip
            continue

        closed_set.add(current)

        if current == goal:
            break

        for next in self.env.get_neighbors(current):
            if self.env.is_occupied(next) or next in closed_set:
                continue

            new_cost = cost_so_far[current] + self.env.get_cost(current, next)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost
                tie_count += 1

                heapq.heappush(open_set, (priority, tie_count, next))
                came_from[next] = current

    if current != goal:
        raise Exception("Path not found")

    path: List[Position] = []
    trace: Optional[Position] = goal
    while trace is not None:
        path.append(trace)
        trace = came_from.get(trace)
    path.reverse()
    return path

seally.planners.gbf

GreedyBestFirst

Greedy Best First Search algorithm for finding paths between points in a enviroment.

Source code in src/seally/planners/gbf.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class GreedyBestFirst():
    """
    Greedy Best First Search algorithm for finding paths between points in a enviroment.
    """
    def __init__(self, env: Enviroment, heuristic: Callable[[Position, Position], float]):
        """
        Initialize a GBF Object.

        Args:
            env: An Enviroment to search.
            heuristic: The "Cost to Go" heuristic. 
        """
        self.env = env
        self.heuristic = heuristic

    def compute_path(self, source: Position, goal: Position) -> Path:
        """
        Computes a path from the source position to the goal position using the Greedy Best First Search Algorithm.

        Args:
            source: The source position in the enviroment.
            goal: The goal position in the enviroment.

        Returns:
            A path from source to goal.
        """

        if not self.env.in_bounds(source) or self.env.is_occupied(source):
            raise Exception("Source is not a valid position")

        if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
            raise Exception("Goal is not a valid position")

        tie_count = 0
        open_set: List[Tuple[float, int, Position]] = []
        heapq.heappush(open_set, (0.0, tie_count, source))

        came_from: Dict[Position, Optional[Position]] = {source: None}
        closed_set = set()  # track fully explored cells

        while open_set:
            _, _, current = heapq.heappop(open_set)

            if current in closed_set:  # already explored optimally, skip
                continue

            closed_set.add(current)

            if current == goal:
                break

            for next in self.env.get_neighbors(current):
                if self.env.is_occupied(next) or next in closed_set:
                    continue

                if next not in came_from:
                    priority = self.heuristic(next, goal)
                    tie_count += 1

                    heapq.heappush(open_set, (priority, tie_count, next))
                    came_from[next] = current

        if current != goal:
            raise Exception("Path not found")

        path: List[Position] = []
        trace: Optional[Position] = goal
        while trace is not None:
            path.append(trace)
            trace = came_from.get(trace)
        path.reverse()
        return path

compute_path(source, goal)

Computes a path from the source position to the goal position using the Greedy Best First Search Algorithm.

Parameters:

Name Type Description Default
source Position

The source position in the enviroment.

required
goal Position

The goal position in the enviroment.

required

Returns:

Type Description
Path

A path from source to goal.

Source code in src/seally/planners/gbf.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def compute_path(self, source: Position, goal: Position) -> Path:
    """
    Computes a path from the source position to the goal position using the Greedy Best First Search Algorithm.

    Args:
        source: The source position in the enviroment.
        goal: The goal position in the enviroment.

    Returns:
        A path from source to goal.
    """

    if not self.env.in_bounds(source) or self.env.is_occupied(source):
        raise Exception("Source is not a valid position")

    if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
        raise Exception("Goal is not a valid position")

    tie_count = 0
    open_set: List[Tuple[float, int, Position]] = []
    heapq.heappush(open_set, (0.0, tie_count, source))

    came_from: Dict[Position, Optional[Position]] = {source: None}
    closed_set = set()  # track fully explored cells

    while open_set:
        _, _, current = heapq.heappop(open_set)

        if current in closed_set:  # already explored optimally, skip
            continue

        closed_set.add(current)

        if current == goal:
            break

        for next in self.env.get_neighbors(current):
            if self.env.is_occupied(next) or next in closed_set:
                continue

            if next not in came_from:
                priority = self.heuristic(next, goal)
                tie_count += 1

                heapq.heappush(open_set, (priority, tie_count, next))
                came_from[next] = current

    if current != goal:
        raise Exception("Path not found")

    path: List[Position] = []
    trace: Optional[Position] = goal
    while trace is not None:
        path.append(trace)
        trace = came_from.get(trace)
    path.reverse()
    return path

seally.planners.wave_front

WaveFront

Wave Front Algorithm for finding the shortest path between points in a GridMap.

Source code in src/seally/planners/wave_front.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class WaveFront():
    """
    Wave Front Algorithm for finding the shortest path between points in a GridMap.
    """
    def __init__(self, env: GridMap):
        """
        Initialize a WaveFront Object.

        Args:
            env: A grid base enviroment.
        """
        self.env = env
        self.wave_field = copy.deepcopy(env)
        self._closed_set = set()  # track fully explored cells
        self._goal_dist = 2 # Defualt Goal distance value

    def calc_wavefront(self, goal: GridCell) -> None:
        """
        Generates a wave front by calculating the distance from each cell in the GridMap to the goal.

        Args:
            goal: The goal cell in the GridMap.
        """
        queue = deque([goal])
        self._closed_set.add(goal)

        while queue:
            # Get the oldest cell in the queue
            current = queue.popleft()

             # Get all the neighbors of the cell
            for n in self.env.get_neighbors(current):
                # The Wave value for each neighbor cell is the current value plus 1
                if not self.env.is_occupied(n) and n not in self._closed_set:
                    self.wave_field.map[n.y, n.x] = self.wave_field.map[current.y, current.x] + self.env.get_cost(current, n)

                    self._closed_set.add(n) # Add the visited cell to the closed set
                    queue.append(n) # add the cell back into the queue so that it can be visited

    def compute_path(self, source: GridCell, goal: GridCell) -> Path:
        """
        Computes the shortest path from the source GridCell to the goal GridCell using the Wave Front Algorithm.

        Args:
            source: The source cell in the GridMap.
            goal: The goal cell in the GridMap.

        Returns:
            A path from source to goal.
        """

        # reset class variavbles
        self.wave_field = copy.deepcopy(self.env)
        self._closed_set = set()

        if not self.env.in_bounds(source) or self.env.is_occupied(source):
            raise Exception("Source is not a valid position")

        if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
            raise Exception("Goal is not a valid position")

        # Set the goals distance value in the wave_feild
        self.wave_field.map[goal.y, goal.x] = self._goal_dist

        # Add the goal to the closed set
        self._closed_set.add(goal)

        # Expand a wave from the goal to the source position
        self.calc_wavefront(goal)

        path = []
        current = source

        # Traverse the Wave Feild from the source to the goal
        while current != goal:
            path.append(current)
            neighbors = self.env.get_neighbors(current)

            next_cell = None
            next_val = float('inf')

            # Find the neighbor with the lowest wave value
            for n in neighbors:
                if self.env.is_occupied(n):
                    continue

                val = self.wave_field.map[n.y, n.x]
                if val > 0 and val < next_val:
                    next_cell = n
                    next_val = val

            # Path could not be found
            if next_cell is None or next_cell == current:
                raise Exception("Goal is unreachable from current position")

            # Visit Next cell
            current = next_cell

        path.append(goal)
        return path

calc_wavefront(goal)

Generates a wave front by calculating the distance from each cell in the GridMap to the goal.

Parameters:

Name Type Description Default
goal GridCell

The goal cell in the GridMap.

required
Source code in src/seally/planners/wave_front.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def calc_wavefront(self, goal: GridCell) -> None:
    """
    Generates a wave front by calculating the distance from each cell in the GridMap to the goal.

    Args:
        goal: The goal cell in the GridMap.
    """
    queue = deque([goal])
    self._closed_set.add(goal)

    while queue:
        # Get the oldest cell in the queue
        current = queue.popleft()

         # Get all the neighbors of the cell
        for n in self.env.get_neighbors(current):
            # The Wave value for each neighbor cell is the current value plus 1
            if not self.env.is_occupied(n) and n not in self._closed_set:
                self.wave_field.map[n.y, n.x] = self.wave_field.map[current.y, current.x] + self.env.get_cost(current, n)

                self._closed_set.add(n) # Add the visited cell to the closed set
                queue.append(n) # add the cell back into the queue so that it can be visited

compute_path(source, goal)

Computes the shortest path from the source GridCell to the goal GridCell using the Wave Front Algorithm.

Parameters:

Name Type Description Default
source GridCell

The source cell in the GridMap.

required
goal GridCell

The goal cell in the GridMap.

required

Returns:

Type Description
Path

A path from source to goal.

Source code in src/seally/planners/wave_front.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def compute_path(self, source: GridCell, goal: GridCell) -> Path:
    """
    Computes the shortest path from the source GridCell to the goal GridCell using the Wave Front Algorithm.

    Args:
        source: The source cell in the GridMap.
        goal: The goal cell in the GridMap.

    Returns:
        A path from source to goal.
    """

    # reset class variavbles
    self.wave_field = copy.deepcopy(self.env)
    self._closed_set = set()

    if not self.env.in_bounds(source) or self.env.is_occupied(source):
        raise Exception("Source is not a valid position")

    if not self.env.in_bounds(goal) or self.env.is_occupied(goal):
        raise Exception("Goal is not a valid position")

    # Set the goals distance value in the wave_feild
    self.wave_field.map[goal.y, goal.x] = self._goal_dist

    # Add the goal to the closed set
    self._closed_set.add(goal)

    # Expand a wave from the goal to the source position
    self.calc_wavefront(goal)

    path = []
    current = source

    # Traverse the Wave Feild from the source to the goal
    while current != goal:
        path.append(current)
        neighbors = self.env.get_neighbors(current)

        next_cell = None
        next_val = float('inf')

        # Find the neighbor with the lowest wave value
        for n in neighbors:
            if self.env.is_occupied(n):
                continue

            val = self.wave_field.map[n.y, n.x]
            if val > 0 and val < next_val:
                next_cell = n
                next_val = val

        # Path could not be found
        if next_cell is None or next_cell == current:
            raise Exception("Goal is unreachable from current position")

        # Visit Next cell
        current = next_cell

    path.append(goal)
    return path

Environments

Enviroment

seally.env.enviroment

Enviroment

Bases: ABC

Source code in src/seally/env/enviroment.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Enviroment(ABC):
    @abstractmethod
    def get_cost(self, start: Position, end: Position) -> float:
        pass

    @abstractmethod
    def is_occupied(self, pos: Position) -> bool:
        """
        !TODO: ADD comment
        """
        pass

    @abstractmethod
    def in_bounds(self, pos: Position) -> bool:
        """
        !TODO Add Comment
        """
        pass

    @abstractmethod
    def get_neighbors(self, position: Position) -> List[Position]:
        """
        !TODO: Add Comment
        """ 
        pass

    """
    @abstractmethod
    def sample_position(self):
        TODO: Add Comment
        pass
    """

get_neighbors(position) abstractmethod

!TODO: Add Comment

Source code in src/seally/env/enviroment.py
44
45
46
47
48
49
@abstractmethod
def get_neighbors(self, position: Position) -> List[Position]:
    """
    !TODO: Add Comment
    """ 
    pass

in_bounds(pos) abstractmethod

!TODO Add Comment

Source code in src/seally/env/enviroment.py
37
38
39
40
41
42
@abstractmethod
def in_bounds(self, pos: Position) -> bool:
    """
    !TODO Add Comment
    """
    pass

is_occupied(pos) abstractmethod

!TODO: ADD comment

Source code in src/seally/env/enviroment.py
30
31
32
33
34
35
@abstractmethod
def is_occupied(self, pos: Position) -> bool:
    """
    !TODO: ADD comment
    """
    pass

Position

Bases: ABC

Cell in a GridMap

Source code in src/seally/env/enviroment.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Position(ABC):
    """
    Cell in a GridMap
    """
    def __init__(self, dims: Dimensions):
        """
        !TODO: Add comment

        dim_array = numpy array
        """
        self.dim_array = np.asarray(dims)

    def __hash__(self):
        return hash((self.x, self.y))

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

Grid Map

seally.env.grid_map

GridCell

Bases: Position

Cell in a GridMap

Source code in src/seally/env/grid_map.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class GridCell(Position):
    """
    Cell in a GridMap
    """
    def __init__(self, dims: Dimensions):
        """
        Initialize a GridCell Object.

        Args:
            dims: A Tuple of Integers representing an index into the Grid Map.
        """
        super().__init__(dims) 
        self.x = dims[0]
        self.y = dims[1]

    def __hash__(self):
        return hash((self.x, self.y))

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

GridMap

Bases: Enviroment

Grid based discritization of an enviroment. GridMaps store enviroments as a numpy array where the value at each cell is either 0 if the cell is free and 1 if it is occupied.

Source code in src/seally/env/grid_map.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class GridMap(Enviroment):
    """
    Grid based discritization of an enviroment. GridMaps store enviroments as a numpy array 
    where the value at each cell is either 0 if the cell is free and 1 if it is occupied.
    """
    def __init__(self, gen_random: bool = True, file_path: Optional[str] = None, move_4d: bool = False):
        """
        Initialize a GridMap Object.

        Args:
            gen_random: Boolean for if the Map should be generated randomly.
            file_path: Path to map file.
            move_4d: Boolean for if the neighborhood of a given cell includes the cells on its diagonals
        """
        if gen_random:
            pass
        else:
            if file_path is None:
                raise ValueError("file_path must be provided when gen_random is False")
            self.map = pd.read_csv(file_path).to_numpy()

        self.x_dim = self.map.shape[1]
        self.y_dim = self.map.shape[0]

        self._move_4d = move_4d


    def get_cost(self, source: Position, goal: Position) -> float:
        """
        Computes the cost of going from the source cell the goal cell. 
        For GridMaps the cost is sqrt(2) for diagonal cells and 1 otherwise.

        Args:
            source: The source cell in the GridMap.
            goal: The goal cell in the GridMap.

        Returns:
            The cost of going from source to goal.
        """
        if self._move_4d:
            return 0.0
        else:
            dx, dy = abs(source.dim_array[0] - goal.dim_array[0]), abs(source.dim_array[1] - goal.dim_array[1])
            return np.sqrt(2) if dx + dy == 2 else 1.0

    def is_occupied(self, cell: Position) -> bool:
        """
        Determines if the provided coordinates are in collision with an obstacle.

        Args:
            cell: Cell to check.

        Returns: 
            True if the position is in collision.
        """
        return self.map[cell.dim_array[1], cell.dim_array[0]] > 0

    def in_bounds(self, cell: Position) -> bool:
        """
        Determines if the provided coordinates within the bounds of the map.

        Args: 
            cell: Cell to check.

        Returns: 
            True if the position is in bounds.
        """

        if cell.dim_array[0] < 0 or cell.dim_array[0] > self.x_dim - 1:
            return False

        if cell.dim_array[1] < 0 or cell.dim_array[1] > self.y_dim - 1:
            return False

        return True

    def get_neighbors(self, cell: Position) -> List[Position]:
        """
        Retuns a list of all valid neighbors for a given cell in the Gripmap
        A valid cell is a cell that is within the bounds of the map.

        Args:
            cell: The cell whose neighbors to retrieve.

        Returns:
            A list of the cells valid neighbors.
        """

        # create a mask of the indicies of the neighboring cells relative to the current cell 
        shift_mask = None
        if self._move_4d:
            shift_mask = np.array([     [-1,0],
                                [0,-1],         [0,1],
                                        [1,0]])
        else:
            shift_mask = np.array([[-1,-1],[-1,0],[-1,1],
                                   [0,-1],         [0,1],
                                   [1,-1], [1,0], [1,1]])

        # broadcast the current cell to each of the shift mask offsets and add them to produce an array of neighbor coordinates
        neighbor_indicies = np.array([cell.dim_array[1], cell.dim_array[0]]) + shift_mask

        # create a mask to validate that all cells are in the bounds of the map
        bounds_mask = (
            (neighbor_indicies[:, 0] >= 0) & (neighbor_indicies[:, 0] < self.y_dim) &
            (neighbor_indicies[:, 1] >= 0) & (neighbor_indicies[:, 1] < self.x_dim)
        )

        # apply the bounds map to each coordinate and return only the cells that are in bounds
        return [GridCell((col, row)) for row, col in neighbor_indicies[bounds_mask]]

get_cost(source, goal)

Computes the cost of going from the source cell the goal cell. For GridMaps the cost is sqrt(2) for diagonal cells and 1 otherwise.

Parameters:

Name Type Description Default
source Position

The source cell in the GridMap.

required
goal Position

The goal cell in the GridMap.

required

Returns:

Type Description
float

The cost of going from source to goal.

Source code in src/seally/env/grid_map.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def get_cost(self, source: Position, goal: Position) -> float:
    """
    Computes the cost of going from the source cell the goal cell. 
    For GridMaps the cost is sqrt(2) for diagonal cells and 1 otherwise.

    Args:
        source: The source cell in the GridMap.
        goal: The goal cell in the GridMap.

    Returns:
        The cost of going from source to goal.
    """
    if self._move_4d:
        return 0.0
    else:
        dx, dy = abs(source.dim_array[0] - goal.dim_array[0]), abs(source.dim_array[1] - goal.dim_array[1])
        return np.sqrt(2) if dx + dy == 2 else 1.0

get_neighbors(cell)

Retuns a list of all valid neighbors for a given cell in the Gripmap A valid cell is a cell that is within the bounds of the map.

Parameters:

Name Type Description Default
cell Position

The cell whose neighbors to retrieve.

required

Returns:

Type Description
List[Position]

A list of the cells valid neighbors.

Source code in src/seally/env/grid_map.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def get_neighbors(self, cell: Position) -> List[Position]:
    """
    Retuns a list of all valid neighbors for a given cell in the Gripmap
    A valid cell is a cell that is within the bounds of the map.

    Args:
        cell: The cell whose neighbors to retrieve.

    Returns:
        A list of the cells valid neighbors.
    """

    # create a mask of the indicies of the neighboring cells relative to the current cell 
    shift_mask = None
    if self._move_4d:
        shift_mask = np.array([     [-1,0],
                            [0,-1],         [0,1],
                                    [1,0]])
    else:
        shift_mask = np.array([[-1,-1],[-1,0],[-1,1],
                               [0,-1],         [0,1],
                               [1,-1], [1,0], [1,1]])

    # broadcast the current cell to each of the shift mask offsets and add them to produce an array of neighbor coordinates
    neighbor_indicies = np.array([cell.dim_array[1], cell.dim_array[0]]) + shift_mask

    # create a mask to validate that all cells are in the bounds of the map
    bounds_mask = (
        (neighbor_indicies[:, 0] >= 0) & (neighbor_indicies[:, 0] < self.y_dim) &
        (neighbor_indicies[:, 1] >= 0) & (neighbor_indicies[:, 1] < self.x_dim)
    )

    # apply the bounds map to each coordinate and return only the cells that are in bounds
    return [GridCell((col, row)) for row, col in neighbor_indicies[bounds_mask]]

in_bounds(cell)

Determines if the provided coordinates within the bounds of the map.

Parameters:

Name Type Description Default
cell Position

Cell to check.

required

Returns:

Type Description
bool

True if the position is in bounds.

Source code in src/seally/env/grid_map.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def in_bounds(self, cell: Position) -> bool:
    """
    Determines if the provided coordinates within the bounds of the map.

    Args: 
        cell: Cell to check.

    Returns: 
        True if the position is in bounds.
    """

    if cell.dim_array[0] < 0 or cell.dim_array[0] > self.x_dim - 1:
        return False

    if cell.dim_array[1] < 0 or cell.dim_array[1] > self.y_dim - 1:
        return False

    return True

is_occupied(cell)

Determines if the provided coordinates are in collision with an obstacle.

Parameters:

Name Type Description Default
cell Position

Cell to check.

required

Returns:

Type Description
bool

True if the position is in collision.

Source code in src/seally/env/grid_map.py
73
74
75
76
77
78
79
80
81
82
83
def is_occupied(self, cell: Position) -> bool:
    """
    Determines if the provided coordinates are in collision with an obstacle.

    Args:
        cell: Cell to check.

    Returns: 
        True if the position is in collision.
    """
    return self.map[cell.dim_array[1], cell.dim_array[0]] > 0

Common

Heuristics

seally.common.heuristics

chebyshev_distance(source, goal)

Computes the Chebyshev distance between the source and goal positions.

Parameters:

Name Type Description Default
source Position

The source position.

required
goal Position

The goal position.

required

Returns:

Type Description
float

The Chebyshev distance between the source and goal positions.

Source code in src/seally/common/heuristics.py
32
33
34
35
36
37
38
39
40
41
42
43
def chebyshev_distance(source: Position, goal: Position) -> float:
    """
    Computes the Chebyshev distance between the source and goal positions.

    Args:
        source: The source position.
        goal: The goal position.

    Returns:
        The Chebyshev distance between the source and goal positions.
    """
    return float(np.linalg.norm(source.dim_array - goal.dim_array, ord=np.inf))

euclidean_distance(source, goal)

Computes the Euclidean distance between the source and goal positions.

Parameters:

Name Type Description Default
source Position

The source position.

required
goal Position

The goal position.

required

Returns:

Type Description
float

The Euclidean distance between the source and goal positions.

Source code in src/seally/common/heuristics.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def euclidean_distance(source: Position, goal: Position) -> float:
    """
    Computes the Euclidean distance between the source and goal positions.

    Args:
        source: The source position.
        goal: The goal position.

    Returns:
        The Euclidean distance between the source and goal positions.
    """
    return float(np.linalg.norm(source.dim_array - goal.dim_array))

manhattan_distance(source, goal)

Computes the Manhattan distance between the source and goal positions.

Parameters:

Name Type Description Default
source Position

The source position.

required
goal Position

The goal position.

required

Returns:

Type Description
float

The Manhattan distance between the source and goal positions.

Source code in src/seally/common/heuristics.py
19
20
21
22
23
24
25
26
27
28
29
30
def manhattan_distance(source: Position, goal: Position) -> float:
    """
    Computes the Manhattan distance between the source and goal positions.

    Args:
        source: The source position.
        goal: The goal position.

    Returns:
        The Manhattan distance between the source and goal positions.
    """
    return float(np.sum(np.abs(source.dim_array - goal.dim_array)))