import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import random
import time
# Set seeds for reproducibility
random.seed(0)
np.random.seed(1)
# PathFinder class: Contains methods for Q-learning, SARSA, and SARSA(λ) to find the optimal path in the graph
class PathFinder(object):
def __init__(self, graph):
"""
Initialize the PathFinder with the graph.
Args:
- graph (networkx.Graph): The graph for which the pathfinding algorithm will be applied.
"""
self.graph = graph # Store the graph
self.num_nodes = len(graph.nodes) # Number of nodes in the graph
def q_learning(self, start_state=0, aim_state=10, num_epoch=500, gamma=0.8, epsilon=0.05, alpha=0.1):
"""
Perform Q-learning to find the shortest path from the start state to the aim state.
Args:
- start_state (int): The starting node of the path.
- aim_state (int): The destination node of the path.
- num_epoch (int): Number of episodes for training the Q-learning algorithm.
- gamma (float): Discount factor for future rewards (how much future rewards are valued).
- epsilon (float): Probability of choosing a random action (exploration vs exploitation).
- alpha (float): Learning rate (how much new information is learned at each step).
Returns:
- path (list): The path from start_state to aim_state learned by Q-learning.
"""
len_of_paths = [] # List to track the lengths of paths for each episode
q_table = np.zeros((self.num_nodes, self.num_nodes)) # Initialize Q-table (num_states x num_actions)
# Calculate 10% milestones for printing detailed info during training
epoch_thresholds = [int(i * num_epoch / 10) for i in range(1, 11)]
# Loop through each episode
for epoch in range(1, num_epoch + 1):
current_state = start_state # Start at the initial state
path = [current_state] # List to store the path
len_of_path = 0 # Track the length of the path
# Output progress for first episode and every 10% milestone
if epoch == 1 or epoch in epoch_thresholds:
print(f"Episode {epoch}/{num_epoch}")
print(f"Starting state: {current_state}")
# Loop until the goal state is reached
while True:
# Epsilon-Greedy action selection (either explore or exploit)
next_state = self.epsilon_greedy(current_state, q_table, epsilon=epsilon)
s_next_next = self.epsilon_greedy(next_state, q_table, epsilon=-0.2) # greedy policy for next state
# Calculate the reward (negative edge weight)
reward = -self.graph[current_state][next_state]['weight']
# Q-table update using the Bellman equation
delta = reward + gamma * q_table[next_state, s_next_next] - q_table[current_state, next_state]
q_table[current_state, next_state] = q_table[current_state, next_state] + alpha * delta
# Output details for each step (if first epoch or one of the milestones)
if epoch == 1 or epoch in epoch_thresholds:
print(f" Current state: {current_state} -> Next state: {next_state}")
print(f" Edge weight (distance): {self.graph[current_state][next_state]['weight']} Reward: {reward}")
print(f" Updated Q-table: {q_table[current_state, next_state]:.4f}")
# Update the current state and accumulate the path length (cost)
current_state = next_state
len_of_path += -reward # Add the cost (negative of the reward) to the path length
path.append(current_state)
# If goal state is reached, break the loop
if current_state == aim_state:
if epoch == 1 or epoch in epoch_thresholds:
print(f" Goal state {aim_state} reached!")
break
len_of_paths.append(len_of_path) # Store the length of the current path
if epoch == 1 or epoch in epoch_thresholds:
print(f"Episode {epoch} completed. Path length: {len_of_path}")
print("--------------------------------------------------")
return path # Return the learned path
def sarsa(self, start_state=0, aim_state=10, num_epoch=500, gamma=0.8, epsilon=0.05, alpha=0.1):
"""
Perform SARSA to find the shortest path from the start state to the aim state.
Args:
- start_state (int): The starting node of the path.
- aim_state (int): The destination node of the path.
- num_epoch (int): Number of episodes for training the SARSA algorithm.
- gamma (float): Discount factor for future rewards (how much future rewards are valued).
- epsilon (float): Probability of choosing a random action (exploration vs exploitation).
- alpha (float): Learning rate (how much new information is learned at each step).
Returns:
- path (list): The path from start_state to aim_state learned by SARSA.
"""
len_of_paths = [] # List to track the lengths of paths for each episode
q_table = np.zeros((self.num_nodes, self.num_nodes)) # Initialize Q-table (num_states x num_actions)
# Calculate 10% milestones for printing detailed info during training
epoch_thresholds = [int(i * num_epoch / 10) for i in range(1, 11)]
# Loop through each episode
for epoch in range(1, num_epoch + 1):
current_state = start_state # Start at the initial state
path = [current_state] # List to store the path
len_of_path = 0 # Track the length of the path
# Epsilon-Greedy action selection (choose the first action randomly)
current_action = self.epsilon_greedy(current_state, q_table, epsilon)
# Output progress for first episode and every 10% milestone
if epoch == 1 or epoch in epoch_thresholds:
print(f"Episode {epoch}/{num_epoch}")
print(f"Starting state: {current_state}")
# Loop until the goal state is reached
while True:
# Perform action to move to next state
next_state = self.get_next_state(current_state, current_action)
reward = -self.graph[current_state][next_state]['weight'] # Reward is negative of edge weight
# Epsilon-Greedy action selection for the next state
next_action = self.epsilon_greedy(next_state, q_table, epsilon)
# SARSA Q-table update
delta = reward + gamma * q_table[next_state, next_action] - q_table[current_state, current_action]
q_table[current_state, current_action] = q_table[current_state, current_action] + alpha * delta
# Output details for each step (if first epoch or one of the milestones)
if epoch == 1 or epoch in epoch_thresholds:
print(f" Current state: {current_state} -> Next state: {next_state}")
print(f" Edge weight (distance): {self.graph[current_state][next_state]['weight']} Reward: {reward}")
print(f" Updated Q-table: {q_table[current_state, current_action]:.4f}")
# Update current state and action
current_state = next_state
current_action = next_action
len_of_path += -reward # Add the cost (negative of the reward) to the path length
path.append(current_state)
# If goal state is reached, break the loop
if current_state == aim_state:
if epoch == 1 or epoch in epoch_thresholds:
print(f" Goal state {aim_state} reached!")
break
len_of_paths.append(len_of_path) # Store the length of the current path
if epoch == 1 or epoch in epoch_thresholds:
print(f"Episode {epoch} completed. Path length: {len_of_path}")
print("--------------------------------------------------")
return path # Return the learned path
def sarsa_lambda(self, start_state=0, aim_state=10, num_epoch=500, gamma=0.8, epsilon=0.05, alpha=0.1, lambda_=0.9):
"""
Perform SARSA(λ) to find the shortest path from the start state to the aim state.
Args:
- start_state (int): The starting node of the path.
- aim_state (int): The destination node of the path.
- num_epoch (int): Number of episodes for training the SARSA(λ) algorithm.
- gamma (float): Discount factor for future rewards (how much future rewards are valued).
- epsilon (float): Probability of choosing a random action (exploration vs exploitation).
- alpha (float): Learning rate (how much new information is learned at each step).
- lambda_ (float): Eligibility trace decay factor (0 ≤ λ ≤ 1).
Returns:
- path (list): The path from start_state to aim_state learned by SARSA(λ).
"""
len_of_paths = [] # List to track the lengths of paths for each episode
q_table = np.zeros((self.num_nodes, self.num_nodes)) # Initialize Q-table (num_states x num_actions)
e_table = np.zeros((self.num_nodes, self.num_nodes)) # Initialize eligibility trace table
# Calculate 10% milestones for printing detailed info during training
epoch_thresholds = [int(i * num_epoch / 10) for i in range(1, 11)]
# Loop through each episode
for epoch in range(1, num_epoch + 1):
current_state = start_state # Start at the initial state
path = [current_state] # List to store the path
len_of_path = 0 # Track the length of the path
# Epsilon-Greedy action selection (choose the first action randomly)
current_action = self.epsilon_greedy(current_state, q_table, epsilon)
# Output progress for first episode and every 10% milestone
if epoch == 1 or epoch in epoch_thresholds:
print(f"Episode {epoch}/{num_epoch}")
print(f"Starting state: {current_state}")
# Loop until the goal state is reached
while True:
# Perform action to move to next state
next_state = self.get_next_state(current_state, current_action)
reward = -self.graph[current_state][next_state]['weight'] # Reward is negative of edge weight
# Epsilon-Greedy action selection for the next state
next_action = self.epsilon_greedy(next_state, q_table, epsilon)
# Update eligibility trace
e_table[current_state, current_action] += 1
# SARSA(λ) Q-table update with eligibility traces
delta = reward + gamma * q_table[next_state, next_action] - q_table[current_state, current_action]
q_table += alpha * delta * e_table # Update Q-table using eligibility traces
e_table *= gamma * lambda_ # Decay eligibility traces
# Output details for each step (if first epoch or one of the milestones)
if epoch == 1 or epoch in epoch_thresholds:
print(f" Current state: {current_state} -> Next state: {next_state}")
print(f" Edge weight (distance): {self.graph[current_state][next_state]['weight']} Reward: {reward}")
print(f" Updated Q-table: {q_table[current_state, current_action]:.4f}")
# Update current state and action
current_state = next_state
current_action = next_action
len_of_path += -reward # Add the cost (negative of the reward) to the path length
path.append(current_state)
# If goal state is reached, break the loop
if current_state == aim_state:
if epoch == 1 or epoch in epoch_thresholds:
print(f" Goal state {aim_state} reached!")
break
len_of_paths.append(len_of_path) # Store the length of the current path
if epoch == 1 or epoch in epoch_thresholds:
print(f"Episode {epoch} completed. Path length: {len_of_path}")
print("--------------------------------------------------")
return path # Return the learned path
def epsilon_greedy(self, s_curr, q, epsilon):
"""
Epsilon-Greedy action selection: Choose a state based on exploration (random choice) or exploitation (best Q-value).
Args:
- s_curr (int): Current state.
- q (numpy.ndarray): Q-table storing the value of each state-action pair.
- epsilon (float): Probability of choosing a random action for exploration.
Returns:
- s_next (int): The next state chosen based on the epsilon-greedy strategy.
"""
# Get the neighbors (connected nodes) of the current state
potential_next_states = np.where(np.array([self.graph.has_edge(s_curr, n) for n in range(self.num_nodes)]))
potential_next_states = potential_next_states[0] # Convert to an array
# Epsilon-Greedy: Choose either exploration (random) or exploitation (best Q-value)
if random.random() > epsilon: # Exploitation: Choose the state with the highest Q-value
q_of_next_states = q[s_curr][potential_next_states] # Q-values of all next states
s_next = potential_next_states[np.argmax(q_of_next_states)] # Choose the state with the max Q-value
else: # Exploration: Choose a random neighboring state
s_next = random.choice(potential_next_states)
return s_next # Return the next state chosen
def get_next_state(self, state, action):
"""
Get the next state based on the current state and action.
Args:
- state (int): Current state.
- action (int): Action taken (next state to go).
Returns:
- next_state (int): The next state after taking the action.
"""
return action # In SARSA, action corresponds directly to the next state
import matplotlib.pyplot as plt
import networkx as nx
import time
def plot_graph(G, title, result_path=None, start_node=None, aim_node=None):
"""
Function to plot the graph with nodes and edges, with optional highlighting of the result path.
Args:
- G (networkx.Graph): The graph to plot.
- title (str): The title for the plot.
- result_path (list, optional): The list of nodes representing the learned path, if available.
- start_node (int, optional): The starting node to be highlighted.
- aim_node (int, optional): The aim (goal) node to be highlighted.
This function visualizes the graph, optionally highlights the nodes and edges based on the provided
result_path, and displays the start and aim nodes with distinct colors.
"""
# Set up the plot with a specific figure size
plt.figure(figsize=(10, 6))
# Position nodes using a force-directed layout
pos = nx.spring_layout(G)
# Get edge weights as labels (for display on the plot)
edge_labels = nx.get_edge_attributes(G, 'weight')
# If result_path is provided, highlight the edges of the optimal path
if result_path:
edge_colors = ['#99cd16' if (min(u, v), max(u, v)) in [(min(result_path[i], result_path[i+1]), max(result_path[i], result_path[i+1])) for i in range(len(result_path)-1)] else 'gray' for u, v in G.edges()]
else:
edge_colors = ['gray' for u, v in G.edges()] # Default edge color if no result path
# Define node colors: red for start_node, yellow for aim_node, light blue for others
node_colors = []
for node in G.nodes():
if node == start_node:
node_colors.append('#cd1666') # Start node as red
elif node == aim_node:
node_colors.append('#e5e21a') # Aim node as yellow
else:
node_colors.append('#16accd') # Default color for other nodes (light blue)
# Plot the graph: nodes, labels, edges, and edge weights
nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=500) # Customize node colors and size
nx.draw_networkx_labels(G, pos, font_size=12, font_weight='bold', font_color='black') # Label the nodes
nx.draw_networkx_edges(G, pos, edge_color=edge_colors, width=2) # Draw edges with specified color
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels) # Draw edge labels (weights)
# Remove the axis (black border)
ax = plt.gca()
ax.set_axis_off() # This removes the axis lines and ticks
# Display the plot with a title
plt.title(title)
plt.show() # Show the plot
def shortestpathfinder(edges, start_node=0, aim_node=10, RL_algorithm='q_learning', num_epoch=500, gamma=0.8, epsilon=0.05, alpha=0.1, lambda_=0.9, plot=True):
"""
Function to execute Q-learning, SARSA, or SARSA(λ) on the provided graph and visualize the results.
Args:
- edges (list): List of edges where each edge is a tuple (u, v, weight).
- start_node (int, optional): The start node for pathfinding.
- aim_node (int, optional): The destination node for pathfinding.
- RL_algorithm (str, optional): The RL algorithm to use ('q-learning', 'sarsa', or 'sarsa_lambda').
- num_epoch (int, optional): Number of training episodes.
- gamma (float, optional): Discount factor for future rewards.
- epsilon (float, optional): Exploration rate.
- alpha (float, optional): Learning rate.
- lambda_ (float, optional): Eligibility trace decay factor (for SARSA(λ)).
- plot (bool, optional): Whether to plot the initial and final graph with the path.
This function constructs a graph from the provided edges, runs the specified reinforcement learning
algorithm (Q-learning, SARSA, or SARSA(λ)) to find the shortest path, and visualizes the graph and path.
"""
# Create the graph with weighted edges
G = nx.Graph()
# Add edges to the graph, ensuring each edge is added in both directions
for u, v, weight in edges:
if not G.has_edge(u, v): # Add edge if it doesn't exist already
G.add_edge(u, v, weight=weight)
if not G.has_edge(v, u): # Add reverse edge if it doesn't exist already
G.add_edge(v, u, weight=weight)
# Visualize the initial graph before Q-learning or SARSA starts (only if plot=True)
if plot:
plot_graph(G, "Initial graph visualization:", start_node=start_node, aim_node=aim_node)
# Instantiate PathFinder class to run reinforcement learning algorithms
rl = PathFinder(G)
# Record the start time for performance tracking
start_time = time.time()
# Perform Q-learning, SARSA, or SARSA(λ) based on the selected algorithm
if RL_algorithm == 'q_learning':
result_path = rl.q_learning(start_state=start_node, aim_state=aim_node, num_epoch=num_epoch, gamma=gamma, epsilon=epsilon, alpha=alpha)
elif RL_algorithm == 'sarsa':
result_path = rl.sarsa(start_state=start_node, aim_state=aim_node, num_epoch=num_epoch, gamma=gamma, epsilon=epsilon, alpha=alpha)
elif RL_algorithm == 'sarsa_lambda':
result_path = rl.sarsa_lambda(start_state=start_node, aim_state=aim_node, num_epoch=num_epoch, gamma=gamma, epsilon=epsilon, alpha=alpha, lambda_=lambda_)
else:
# Raise an error if an invalid RL_algorithm is provided
raise ValueError("Currently, only 'q-learning', 'sarsa', and 'sarsa_lambda' are supported.")
# Record the end time for performance tracking
end_time = time.time()
# Calculate and print elapsed time
elapsed_time = end_time - start_time
print(f"Time taken to complete {RL_algorithm}: {elapsed_time:.2f} seconds")
# Print the learned path
print(f"Learned path from node {start_node} to node {aim_node}: {result_path}")
# Calculate the total path length (sum of edge weights along the path)
path_length = 0
for i in range(len(result_path) - 1):
u, v = result_path[i], result_path[i + 1]
path_length += G[u][v]['weight'] # Sum up the weights of the edges in the path
# Print the total path length
print(f"Path length: {path_length}")
# Visualize the graph with the learned path highlighted (if plot=True)
if plot:
plot_graph(G, f"\nGraph with the shortest path found by {RL_algorithm}:", result_path, start_node=start_node, aim_node=aim_node)
# Example of how to use the pathfinder function
if __name__ == '__main__':
# Define edges with weights (distance between nodes), where only one direction is provided
edges = [
(0, 1, 1), (0, 3, 1), (4, 2, 2), (4, 1, 2),
(4, 8, 1), (4, 9, 2), (2, 3, 1), (2, 6, 2),
(2, 5, 1), (5, 6, 2), (7, 8, 1), (7, 5, 2),
(2, 10, 3), (10, 6, 1), (8, 9, 3), (8, 11, 1),
(9, 11, 1) # Only one direction for the edges
]
# Run the pathfinder for the defined edges with custom arguments (example for Q-Learning)
shortestpathfinder(edges, start_node=0, aim_node=11, RL_algorithm='q_learning', epsilon=0.02, plot=True, num_epoch=300)