Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Open In Colab   Open in Kaggle

Tutorial 3: Neural network modularity

Week 2, Day 1: Macrocircuits

By Neuromatch Academy

Content creators: Ruiyi Zhang

Content reviewers: Xaq Pitkow, Hlib Solodzhuk, Patrick Mineault, Alex Murphy

Production editors: Konstantine Tsafatinos, Ella Batty, Spiros Chavlis, Samuele Bolotta, Hlib Solodzhuk, Alex Murphy


Tutorial objectives

Estimated timing of tutorial: 1 hour

This tutorial will exemplify the importance of modularity in neural network architecture. We will train deep reinforcement learning (RL) agents with two types of neural network architectures, one modular and the other holistic, in a neuroscience navigation task.

As you have learned in previous lectures, better learning and generalization are important benefits of an appropriate inductive bias. This applies to both biological and artificial learning systems. Indeed, it has been shown that macaques trained in this navigation task can master the training task well and generalize to novel tasks derived from it. Since the brain is quite modular, it will be interesting to see if artificial models with a more modular architecture also allow for better learning and generalization than those with a less modular, holistic architecture.

Our learning objectives for today are:

  1. Build RL agents with different neural architectures for a spatial navigation task.

  2. Compare the differences in learning between RL agents with different architectures.

  3. Compare the differences in generalization between RL agents with different architectures.

  4. Use neural decoding to understand why the modular architecture for this specific task is advantageous.

  5. Keep the No-Free-Lunch Theorem in mind: the benefits of a modular architecture for one task cannot apply to all possible tasks.

This tutorial is based on this paper.


Setup

Install and import feedback gadget

Source
# @title Install and import feedback gadget

!pip install vibecheck datatops --quiet
!pip install pandas --quiet
!pip install scikit-learn --quiet

from vibecheck import DatatopsContentReviewContainer
def content_review(notebook_section: str):
    return DatatopsContentReviewContainer(
        "",  # No text prompt
        notebook_section,
        {
            "url": "https://pmyvdlilci.execute-api.us-east-1.amazonaws.com/klab",
            "name": "neuromatch_neuroai",
            "user_key": "wb2cxze8",
        },
    ).render()

feedback_prefix = "W2D1_T3"

Imports

Source
# @title Imports

#plotting
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.patches import Circle

#modeling
from scipy.stats import sem
from sklearn.linear_model import LinearRegression, Ridge
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd

#utils
from tqdm.notebook import tqdm
import pickle
import logging
import os
from pathlib import Path
import requests
import hashlib
import zipfile

Figure settings

Source
# @title Figure settings

logging.getLogger('matplotlib.font_manager').disabled = True

%matplotlib inline
%config InlineBackend.figure_format = 'retina' # perfrom high definition rendering for images and plots
plt.style.use("https://raw.githubusercontent.com/NeuromatchAcademy/course-content/main/nma.mplstyle")

Helper functions

Source
# @title Helper functions

def my_tickformatter(value, pos):
    if abs(value) > 0 and abs(value) < 1:
        value = str(value).replace('0.', '.').replace('-', '\u2212')
    elif value == 0:
        value = 0
    elif int(value) == value:
        value = int(value)
    return value

def cart2pol(x, y):
    rho = np.sqrt(x**2 + y**2)
    phi = np.arctan2(y, x)
    return rho, phi

def get_neural_response(agent, df):
    responses = []
    with torch.no_grad():
        for _, trial in df.iterrows():
            response = agent.actor.rnn(trial.state_input)[0]
            responses.append(response.squeeze(1))
        df['response'] = responses

def fit_decoder(trajectory, variables=['pos_x', 'pos_y'], train_frac=0.7):
    key = 'response'
    train_trajectory = trajectory[:round(len(trajectory) * train_frac)]
    train_X = np.vstack(train_trajectory[key])
    test_trajectory = trajectory[round(len(trajectory) * train_frac):]
    test_X = np.vstack(test_trajectory[key])

    y = train_trajectory[variables].values
    train_y = np.vstack([np.hstack([v for v in y[:, i]]) for i in range(y.shape[1])]).T
    y = test_trajectory[variables].values
    test_y = np.vstack([np.hstack([v for v in y[:, i]]) for i in range(y.shape[1])]).T

    decoder = Ridge(alpha=0.1)
    decoder.fit(train_X, train_y)

    return decoder, test_X, test_y

def filter_fliers(data, whis=1.5, return_idx=False):
    if not isinstance(data, list):
        data = [data]
    filtered_data = []; data_ides = []
    for value in data:
        Q1, Q2, Q3 = np.percentile(value, [25, 50, 75])
        lb = Q1 - whis * (Q3 - Q1); ub = Q3 + whis * (Q3 - Q1)
        filtered_data.append(value[(value > lb) & (value < ub)])
        data_ides.append(np.where((value > lb) & (value < ub))[0])
    if return_idx:
        return filtered_data, data_ides
    else:
        return filtered_data

Plotting functions

Source
# @title Plotting functions

major_formatter = FuncFormatter(my_tickformatter)
modular_c = 'lightseagreen'; holistic_c = 'salmon'
reward_c = 'rosybrown'; unreward_c = 'dodgerblue'

fontsize = 7
lw = 1

def set_violin_plot(vp, facecolor, edgecolor, linewidth=1, alpha=1, ls='-', hatch=r''):
    with plt.xkcd():
        plt.setp(vp['bodies'], facecolor=facecolor, edgecolor=edgecolor,
                 linewidth=linewidth, alpha=alpha ,ls=ls, hatch=hatch)
        plt.setp(vp['cmins'], facecolor=facecolor, edgecolor=edgecolor,
                 linewidth=linewidth, alpha=alpha)
        plt.setp(vp['cmaxes'], facecolor=facecolor, edgecolor=edgecolor,
                 linewidth=linewidth, alpha=alpha)
        plt.setp(vp['cbars'], facecolor='None', edgecolor='None',
                 linewidth=linewidth, alpha=alpha)

        linecolor = 'k' if facecolor == 'None' else 'snow'
        if 'cmedians' in vp:
            plt.setp(vp['cmedians'], facecolor=linecolor, edgecolor=linecolor,
                     linewidth=linewidth, alpha=alpha, ls=ls)
        if 'cmeans' in vp:
            plt.setp(vp['cmeans'], facecolor=linecolor, edgecolor=linecolor,
                     linewidth=linewidth, alpha=alpha, ls=ls)

Set random seed

Source
# @title Set random seed

import random
import numpy as np

def set_seed(seed=None, seed_torch=True):
  if seed is None:
    seed = np.random.choice(2 ** 32)
  random.seed(seed)
  np.random.seed(seed)
  if seed_torch:
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

set_seed(seed = 42)

Data retrieval

Source
# @title Data retrieval

def download_file(fname, url, expected_md5):
    """
    Downloads a file from the given URL and saves it locally.
    """
    if not os.path.isfile(fname):
        try:
            r = requests.get(url)
        except requests.ConnectionError:
            print("!!! Failed to download data !!!")
            return
        if r.status_code != requests.codes.ok:
            print("!!! Failed to download data !!!")
            return
        if hashlib.md5(r.content).hexdigest() != expected_md5:
            print("!!! Data download appears corrupted !!!")
            return
        with open(fname, "wb") as fid:
            fid.write(r.content)

def extract_zip(zip_fname):
    """
    Extracts a ZIP file to the current directory.
    """
    with zipfile.ZipFile(zip_fname, 'r') as zip_ref:
        zip_ref.extractall(".")

# Details for the zip files to be downloaded and extracted
zip_files = [
    {
        "fname": "agents.zip",
        "url": "https://osf.io/v9xqp/download",
        "expected_md5": "2cd35da7ea34e10e6ed2e7c983f0b908"
    },
    {
        "fname": "training_curve.zip",
        "url": "https://osf.io/9kjy4/download",
        "expected_md5": "eb7e07398aa12bd0fd9cf507d9a142c6"
    }
]

# New addition for other files to be downloaded, specifically non-zip files
model_files = [
    {
        "fname": "holistic_dfs.pkl",
        "url": "https://osf.io/9h7tq/download",
        "expected_md5": "92d0adb175724641590a611c48d721cc"
    },
    {
        "fname": "holistic_dfs_2x.pkl",
        "url": "https://osf.io/ybdmp/download",
        "expected_md5": "173e13bea9c2bbe8737a40e0d36063d4"
    },
    {
        "fname": "modular_dfs.pkl",
        "url": "https://osf.io/apkym/download",
        "expected_md5": "38a4603464b3e4351bd56625d24d5e16"
    },
    {
        "fname": "modular_dfs_2x.pkl",
        "url": "https://osf.io/wqbe2/download",
        "expected_md5": "5f19bafa3308f25ca163c627b7d9f63f"
    }
]

# Process zip files: download and extract
for zip_file in zip_files:
    download_file(zip_file["fname"], zip_file["url"], zip_file["expected_md5"])
    extract_zip(zip_file["fname"])

# Process model files: download only
for model_file in model_files:
    download_file(model_file["fname"], model_file["url"], model_file["expected_md5"])

Video 1: Introduction

Youtube
Bilibili

Section 1: RL agents in a spatial navigation task

We will use a naturalistic virtual navigation task to train and test RL agents. This task was previously used to investigate the neural computations underlying macaques’ flexible behaviors.

Task setup

Video 2: Task Setup

Youtube
Bilibili

At the beginning of each trial, the subject is situated at the center of the ground plane facing forward; a target is presented at a random location within the field of view (distance: 100 to 400 cm, angle: -35 to +35+35^{\circ}) on the ground plane and disappears after 300 ms. The subject can freely control its linear and angular velocities with a joystick (maximum: 200 cm/s and 9090^{\circ}/s, referred to as the joystick gain) to move along its heading in the virtual environment. The objective is to navigate toward the memorized target location and then stop inside the reward zone, a circular region centered at the target location with a radius of 65 cm. A reward is given only if the subject stops inside the reward zone (see figure below).

The subject’s self-location is not directly observable because there are no stable landmarks; instead, the subject needs to use optic flow cues on the ground plane to perceive self-motion and perform path integration. Each textural element of the optic flow, an isosceles triangle, appears at random locations and orientations, disappearing after only a short lifetime, making it impossible to use as a stable landmark. A new trial starts after the subject stops moving.

Navigation task scheme

Task modeling

Video 3: Task Modeling

Youtube
Bilibili

We formulate this task as a Partially Observable Markov Decision Process (POMDP) in discrete time, with continuous state and action spaces (see figure below). At each time step tt, the environment is in the state st\boldsymbol{s}_t (including the agent’s position and velocity and the target’s position). The agent takes an action at\boldsymbol{a}_t (controlling its linear and angular velocities) to update st\boldsymbol{s}_t to the next state st+1\boldsymbol{s}_{t+1} following the environmental dynamics given by the transition probability T(st+1st,at)T(\boldsymbol{s}_{t+1}|\boldsymbol{s}_{t},\boldsymbol{a}_{t}), and receives a reward rtr_t from the environment following the reward function R(st,at)R(\boldsymbol{s}_{t},\boldsymbol{a}_{t}) (1 if the agent stops inside the reward zone otherwise 0).

Actor-critic model

We use a model-free actor-critic approach to learning, with the actor and critic implemented using distinct neural networks:

  • The actor: At each tt, the actor receives two sources of inputs it\boldsymbol{i}_{t} about the state: observation ot\boldsymbol{o}_t and last action at1\boldsymbol{a}_{t-1}. It then outputs an action at\boldsymbol{a}_{t}, aiming to maximize the state-action value QtQ_t. This value is a function of the state and action, representing the expected discounted rewards when an action is taken at a state, and future rewards are then accumulated from tt until the trial’s last step.

  • The critic: Since the ground truth value is unknown, the critic is used to approximate the value. In addition to receiving the same inputs as the actor to infer the state, the critic also takes as inputs the action at\boldsymbol{a}_t taken by the actor in this state. It then outputs the estimated QtQ_t for this action, trained through the temporal-difference reward prediction error (TD error) after receiving the reward rtr_t (rt+γQt+1Qt(|r_t+\gamma Q_{t+1}-Q_{t}|, where γ\gamma denotes the temporal discount factor).

The state st\boldsymbol{s}_t is not fully observable, so the agent must maintain an internal state representation (belief btb_t) for deciding at\boldsymbol{a}_{t} and QtQ_t. Both actor and critic undergo end-to-end training through back-propagation without explicit objectives for shaping btb_t. Consequently, networks are free to learn diverse forms of btb_t encoded in their neural activities that aid them in achieving their learning objectives. Ideally, networks may develop an effective belief update rule, using the two sources of evidence in the inputs it={ot,at1}\boldsymbol{i}_t=\{\boldsymbol{o}_t, \boldsymbol{a}_{t-1}\}. They may predict the state st\boldsymbol{s}_t based on its internal model of the dynamics, its previous belief bt1b_{t-1}, and the last self-action at1\boldsymbol{a}_{t-1} (e.g., a motor efference copy). The second source is a partial and noisy observation ot\boldsymbol{o}_t of st\boldsymbol{s}_t drawn from the observation probability O(otst)O(\boldsymbol{o}_t|\boldsymbol{s}_t). Note that the actual OO in the brain for this task is unknown. For simplicity, we model ot\boldsymbol{o}_t as a low-dimensional vector, including the target’s location when visible (the first 300 ms, Δt=0.1\Delta t=0.1 s), and the agent’s observation of its velocities through optic flow, with velocities subject to Gaussian additive noise.

Video 4: Task Parameters

Youtube
Bilibili

Given this task modeling, we first specify some parameters.

Parameters definition

Source
# @title Parameters definition

class Config():
    def __init__(self):
        self.STATE_DIM = 5             # dimension of agent's state: x position, y position, heading, linear vel. v, angular vel. w
        self.ACTION_DIM = 2            # dimension of agent's action: action_v, action_w
        self.OBS_DIM = 2               # dimension of agent's observation of its movement: observation_v, observation_w
        self.TARGET_DIM = 2            # dimension of target's position: target_x, target_y

        self.TERMINAL_ACTION = 0.1     # start/stop threshold
        self.DT = 0.1                  # discretization time step
        self.EPISODE_TIME = 3.5        # max trial duration in seconds
        self.EPISODE_LEN = int(self.EPISODE_TIME / self.DT)   # max trial duration in steps

        self.LINEAR_SCALE = 400        # cm/unit
        self.goal_radius = torch.tensor([65]) / self.LINEAR_SCALE    # reward zone radius
        self.initial_radius_range = np.array([100, 400]) / self.LINEAR_SCALE   # range of target distance
        self.relative_angle_range = np.deg2rad([-35, 35])                      # range of target angle
        self.process_gain_default = torch.tensor([200 / self.LINEAR_SCALE, torch.deg2rad(torch.tensor(90.))]) # joystick gain
        self.target_offT = 3        # when target is invisible; by default target is invisible after the first 300 ms

        self.pro_noise_std_ = 0.2         # process noise std
        self.obs_noise_std_ = 0.1         # observation noise std

Coding Exercise 1: Task environment

Video 5: Task Environment

Youtube
Bilibili

We then define the task environment in the following code. You can see from the code how a target is sampled for each trial. You will need to fill in the missing code to define the task dynamics T(st+1st,at)T(\boldsymbol{s}_{t+1}|\boldsymbol{s}_{t},\boldsymbol{a}_{t}).

The code for the dynamics implements the following mathematical equations:

sxt+1=sxt+svtcos(sθt)Δts_{x_{t+1}}=s_{x_{t}}+s_{v_{t}} \cos (s_{\theta_t})\,\Delta t\\
syt+1=syt+svtsin(sθt)Δts_{y_{t+1}}=s_{y_{t}}+s_{v_{t}} \sin (s_{\theta_t})\,\Delta t\\
sθt+1=sθt+sωtΔts_{\theta_{t+1}}=s_{\theta_{t}}+s_{\omega_{t}}\,\Delta t \\
svt+1=Gvavt+ηvts_{v_{t+1}}=G_{v}a_{v_t} + \eta_{v_t} \\
sωt+1=Gωaωt+ηωts_{\omega_{t+1}}=G_{\omega} a_{\omega_t} + \eta_{\omega_t}\\

where the state elements include:

  • x position sxts_{x_{t}}

  • y position syts_{y_{t}}

  • heading sθts_{\theta_{t}}

  • linear velocity svts_{v_{t}}

  • angular velocity sωts_{\omega_{t}}

GvG_{v} and GωG_{\omega} are the joystick gains mapping actions to linear and angular velocities. ηvt\eta_{v_t} and ηωt\eta_{\omega_t} are process noises.

class Env(nn.Module):
    def __init__(self, arg):
        """
        Initializes the environment with given arguments.

        Inputs:
        - arg (object): An object containing initialization parameters.

        Outputs:
        - None
        """
        super().__init__()
        self.__dict__.update(arg.__dict__)

    def reset(self, target_position=None, gain=None):
        """
        Resets the environment to start a new trial.

        Inputs:
        - target_position (tensor, optional): The target position for the trial. If None, a position is sampled.
        - gain (tensor, optional): The joystick gain. If None, the default gain is used.

        Outputs:
        - initial_state (tensor): The initial state of the environment.
        """
        # sample target position
        self.target_position = target_position
        if target_position is None:
            target_rel_r = torch.sqrt(torch.zeros(1).uniform_(*self.initial_radius_range**2))
            target_rel_ang = torch.zeros(1).uniform_(*self.relative_angle_range)
            rel_phi = np.pi/2 - target_rel_ang
            target_x = target_rel_r * torch.cos(rel_phi)
            target_y = target_rel_r * torch.sin(rel_phi)
            self.target_position = torch.tensor([target_x, target_y]).view([-1, 1])
        self.target_position_obs = self.target_position.clone()

        # joystick gain
        self.gain = gain
        if gain is None:
            self.gain = self.process_gain_default

        # process noise std
        self.pro_noise_std = self.gain * self.pro_noise_std_

        return torch.tensor([0, 0, np.pi / 2, 0, 0]).view([-1, 1])  # return the initial state

    def forward(self, x, a, t):
        """
        Updates the state based on the current state, action, and time.

        Inputs:
        - x (tensor): The current state of the environment.
        - a (tensor): The action taken.
        - t (int): The current time step.

        Outputs:
        - next_x (tensor): The next state of the environment.
        - reached_target (bool): Whether the target has been reached.
        """
        if t == self.target_offT:
            self.target_position_obs *= 0  # make target invisible

        relative_dist = torch.dist(x[:2], self.target_position)
        reached_target = relative_dist < self.goal_radius
        next_x = self.dynamics(x, a.view(-1))  # update state based on environment dynamics

        return next_x, reached_target

    def dynamics(self, x, a):
        """
        Defines the environment dynamics.

        Inputs:
        - x (tensor): The current state of the environment.
        - a (tensor): The action taken.

        Outputs:
        - next_x (tensor): The next state of the environment.
        """
        # sample process noise
        eta = self.pro_noise_std * torch.randn(2)

        # there are five elements in the state
        px, py, heading_angle, lin_vel, ang_vel = torch.split(x.view(-1), 1)

        # update state: s_{t+1} = f(s_{t}, a_{t})
        ###################################################################
        ## Fill out the following then remove
        raise NotImplementedError("Student exercise: complete update for y position and angular velocity.")
        ###################################################################
        px_ = px + lin_vel * torch.cos(heading_angle) * self.DT
        # Hint: Mimic how the x position is updated. The y position update is similar,
        # but it requires the sine of 'heading_angle' instead of the cosine.
        py_ = ...
        heading_angle_ = heading_angle + ang_vel * self.DT
        lin_vel_ = self.gain[0] * a[0] + eta[0]
        # Hint: The variables 'self.gain', 'a', and 'eta' are two-dimensional.
        # The first dimension is for the linear component, and the second dimension is for the angular component.
        ang_vel_ = ...

        next_x = torch.stack([px_, py_, heading_angle_,
                              lin_vel_.reshape(1), ang_vel_.reshape(1)]).view([-1, 1])
        return next_x

    def is_stop(self, action):
        """
        Determines if the given action is a stop action.

        Inputs:
        - action (tensor): The action.

        Outputs:
        - stop (bool): Whether the action is a stop action.
        """
        stop = (action.abs() < self.TERMINAL_ACTION).all()
        return stop

Coding exercise 2: RL agent

Agent observation

Next, we define the observation probability O(otst)O(\boldsymbol{o}_t|\boldsymbol{s}_t) from which the observation ot\boldsymbol{o}_t is drawn.

The observation for self-movement is defined as

ot=Htst+ζt\boldsymbol{o}_t=H_t \boldsymbol{s}_t+\boldsymbol{\zeta}_t

where ζt\boldsymbol{\zeta}_t is a zero-mean Gaussian observation noise, and the observation model HtH_{t} is a matrix that only takes the velocity elements (linear and angular velocities) from the true state, filtering out positional elements as they are unobservable. Essentially, the observation for self-movement is a noisy version of the agent’s linear and angular velocities.

Observation dynamics

Source
# @title Observation dynamics

class ObsStep(nn.Module):
    def __init__(self, arg):
        """
        Initializes the observation step with given arguments.

        Inputs:
        - arg (object): An object containing the parameters for state dimension, observation dimension, and observation noise standard deviation.

        Outputs:
        - None
        """
        super().__init__()
        self.STATE_DIM = arg.STATE_DIM
        self.OBS_DIM = arg.OBS_DIM
        self.obs_noise_std_ = arg.obs_noise_std_

        # observation matrix
        self.H = torch.zeros(self.OBS_DIM, self.STATE_DIM)
        self.H[0, -2] = 1
        self.H[1, -1] = 1

    def reset(self, gain):
        """
        Resets the observation noise standard deviation based on the given gain.

        Inputs:
        - gain (tensor): The gain used to scale the observation noise.

        Outputs:
        - None
        """
        self.obs_noise_std = self.obs_noise_std_ * gain

    def forward(self, x):
        """
        Computes the observation based on the current state and observation noise.

        Inputs:
        - x (tensor): The current state of the environment.

        Outputs:
        - o_t (tensor): The observed state with added noise.
        """
        zeta = (self.obs_noise_std * torch.randn(self.OBS_DIM)).view([-1, 1])
        o_t = self.H @ x + zeta
        return o_t

Actor-critic RL agent

Video 6: RL Agent

Youtube
Bilibili

Each RL agent requires an actor and a critic network. Actor and critic networks can have a variety of architectures. Our goal here is to investigate whether functionally specialized modules provide advantages for our task. Therefore, we designed architectures incorporating modules with distinct levels of specialization for comparison. The first architecture is a holistic actor/critic, comprising a single module where all neurons jointly compute the belief btb_t and the action at\boldsymbol{a}_t/value QtQ_t. In contrast, the second architecture is a modular actor/critic, featuring modules specialized in computing different variables (see the figure below).

The specialization of each module is determined as follows.

First, we can confine the computation of beliefs. Since computing beliefs about the evolving state requires integrating evidence over time, a network capable of computing belief must possess some form of memory. Recurrent neural networks (RNNs) satisfy this requirement by using a hidden state that evolves over time. In contrast, computations of value and action do not need additional memory when the belief is provided, making memoryless multi-layer perceptrons (MLPs) sufficient. Consequently, adopting an architecture with an RNN followed by a memoryless MLP (modular actor/critic) ensures that the computation of belief is exclusively confined to the RNN.

Second, we can confine the computation of the state-action value QtQ_t for the critic. Since a critic is trained end-to-end to compute QtQ_t, stacking two modules between all inputs and outputs does not limit the computation of QtQ_t to a specific module. However, since QtQ_t is a function of the action at\boldsymbol{a}_t, we can confine the computation of QtQ_t to the second module of the modular critic by supplying at\boldsymbol{a}_t only to the second module. This ensures that the first module, lacking access to the action, cannot accurately compute QtQ_t. Therefore, the modular critic’s RNN is dedicated to computing btb_t and sends it to the MLP dedicated to computing QtQ_t. This architecture enforces modularity.

Besides the critic, the modular actor has higher specialization than the holistic actor, which lacks confined btb_t computation. Thought bubbles in the figure below denote the variables that can be computed within each module enforced through architecture rather than indicating they are encoded in each module. For example, btb_t in modular architectures is passed to the second module, but an accurate btb_t computation can only be completed in the first RNN module.

Actor-critic architecture for holistic and modular representations

We will compare two agents: the modular agent, which uses modular actor and modular critic networks, and the holistic agent, which uses holistic actor and holistic critic networks.

For simplicity, we will only present the code for actors here.

class Agent():
    def __init__(self, arg, Actor):
        """
        Initializes the agent with given arguments and an actor model.

        Inputs:
        - arg (object): An object containing initialization parameters such as observation dimension, action dimension, and target dimension.
        - Actor (class): The actor model class to be used by the agent.

        Outputs:
        - None
        """
        self.__dict__.update(arg.__dict__)

        self.actor = Actor(self.OBS_DIM, self.ACTION_DIM, self.TARGET_DIM)
        self.obs_step = ObsStep(arg)

    def select_action(self, state_input, hidden_in):
        """
        Selects an action based on the current state input and hidden state.

        Inputs:
        - state_input (tensor): The current state input to the actor model.
        - hidden_in (tensor): The hidden state input to the actor model.

        Outputs:
        - action (tensor): The action selected by the actor model.
        - hidden_out (tensor): The updated hidden state from the actor model.
        """
        with torch.no_grad():
            action, hidden_out = self.actor(state_input, hidden_in)
        return action, hidden_out

    def load(self, data_path, filename):
        """
        Loads the actor model parameters from a file.

        Inputs:
        - data_path (Path): The path to the directory containing the file.
        - filename (str): The name of the file to load the parameters from.

        Outputs:
        - None
        """
        self.filename = filename
        file = data_path / f'{self.filename}.tar'
        params = torch.load(file)
        self.actor.load_state_dict(params['actor_dict'])

Holistic actor

We define the holistic actor as follows. Fill in the missing code to define the holistic architecture.

class HolisticActor(nn.Module):
    def __init__(self, OBS_DIM, ACTION_DIM, TARGET_DIM):
        """
        Initializes the holistic actor model with given dimensions.

        Inputs:
        - OBS_DIM (int): The dimension of the observation input.
        - ACTION_DIM (int): The dimension of the action output.
        - TARGET_DIM (int): The dimension of the target input.

        Outputs:
        - None
        """
        super().__init__()
        self.OBS_DIM = OBS_DIM
        self.ACTION_DIM = ACTION_DIM
        self.RNN_SIZE = 220  # RNN hidden size

        self.rnn = nn.LSTM(input_size=OBS_DIM + ACTION_DIM + TARGET_DIM, hidden_size=self.RNN_SIZE)
        self.l1 = nn.Linear(self.RNN_SIZE, ACTION_DIM)

    def forward(self, x, hidden_in):
        """
        Computes the action based on the current input and hidden state.

        Inputs:
        - x (tensor): The current input to the model, which includes observation, action, and target information.
        - hidden_in (tuple): The initial hidden state for the LSTM.

        Outputs:
        - a (tensor): The action output from the model.
        - hidden_out (tuple): The updated hidden state from the LSTM.
        """
        ###################################################################
        ## Fill out the following then remove
        raise NotImplementedError("Student exercise: complete forward propagation through holistic actor.")
        ###################################################################
        #######################################################
        # TODO: Pass the input 'x' and the previous hidden state 'hidden_in' to the RNN module 'self.rnn'.
        # Get the output 'x' and the hidden state 'hidden_out' from the RNN module.
        # Refer to https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html.
        # Hint: 'self.rnn' takes two arguments as inputs and outputs two things.
        # The first position corresponds to 'x', and the second position corresponds to the hidden state.
        #######################################################
        x, hidden_out = ...

        a = torch.tanh(self.l1(x))
        return a, hidden_out

Test your implementation of `HolisticActor’!

Source
# @title Test your implementation of `HolisticActor'!
set_seed(0)

arg = Config()
actor_test = HolisticActor(arg.OBS_DIM, arg.ACTION_DIM, arg.TARGET_DIM)

x_test = torch.randn(1, 1, 6); hidden_in_test = (torch.randn(1, 1, 220), torch.randn(1, 1, 220))
a, hidden_out = actor_test(x_test, hidden_in_test)

if torch.norm(a.reshape(-1) - torch.tensor([0.1145, -0.1694])) < 1e-2:
    print('Your function is correct!')
else:
    print('Your function is incorrect!')

Modular actor

We define the modular actor as follows. Fill in the missing code to define the modular architecture.

class ModularActor(nn.Module):
    def __init__(self, OBS_DIM, ACTION_DIM, TARGET_DIM):
        """
        Initializes the modular actor model with given dimensions.

        Inputs:
        - OBS_DIM (int): The dimension of the observation input.
        - ACTION_DIM (int): The dimension of the action output.
        - TARGET_DIM (int): The dimension of the target input.

        Outputs:
        - None
        """
        super().__init__()
        self.OBS_DIM = OBS_DIM
        self.ACTION_DIM = ACTION_DIM
        self.RNN_SIZE = 128  # RNN hidden size
        MLP_SIZE = 300  # number of neurons in one MLP layer

        self.rnn = nn.LSTM(input_size=OBS_DIM + ACTION_DIM + TARGET_DIM, hidden_size=self.RNN_SIZE)
        self.l1 = nn.Linear(self.RNN_SIZE, MLP_SIZE)
        self.l2 = nn.Linear(MLP_SIZE, MLP_SIZE)
        self.l3 = nn.Linear(MLP_SIZE, ACTION_DIM)

    def forward(self, x, hidden_in):
        """
        Computes the action based on the current input and hidden state.

        Inputs:
        - x (tensor): The current input to the model, which includes observation, action, and target information.
        - hidden_in (tuple): The initial hidden state for the LSTM.

        Outputs:
        - a (tensor): The action output from the model.
        - hidden_out (tuple): The updated hidden state from the LSTM.
        """
        ###################################################################
        ## Fill out the following then remove
        raise NotImplementedError("Student exercise: complete forward propagation through modular actor.")
        ###################################################################
        #######################################################
        # TODO: Pass 'x' to the MLP module, which consists of two linear layers with ReLU nonlinearity.
        # First, pass 'x' to the first linear layer, 'self.l1', followed by 'F.relu'.
        # Second, pass 'x' again to the second linear layer, 'self.l2', followed by 'F.relu'.
        #######################################################
        x, hidden_out = self.rnn(x, hidden_in)
        x = ...
        x = ...

        a = torch.tanh(self.l3(x))

        return a, hidden_out

Test your implementation of `ModularActor’!

Source
# @title Test your implementation of `ModularActor'!
set_seed(0)

arg = Config()
actor_test = ModularActor(arg.OBS_DIM, arg.ACTION_DIM, arg.TARGET_DIM)

x_test = torch.randn(1, 1, 6); hidden_in_test = (torch.randn(1, 1, 128), torch.randn(1, 1, 128))
a, hidden_out = actor_test(x_test, hidden_in_test)

if torch.norm(a.reshape(-1) - torch.tensor([0.0068, 0.0307])) < 1e-2:
    print('Your function is correct!')
else:
    print('Your function is incorrect!')

Coding Exercise 2 Discussion

  1. To ensure a fair comparison, the total number of trainable parameters is designed to be similar between the two architectures. How many trainable parameters are there in each architecture?


Section 2: Evaluate agents in the training task

Estimated timing to here from start of tutorial: 25 minutes

With the code for the environment and agents complete, we will now write an evaluation function allowing the agent to interact with the environment where the quality of the model can be assessed.

Coding Exercise 3: Evaluation function

We first sample 1,000 targets for the RL agent to steer towards.

Sample 1000 targets

Source
# @title Sample 1000 targets
arg = Config()

set_seed(0)
env = Env(arg)
target_positions = []

for _ in range(1000):
    __ = env.reset()
    target_positions.append(env.target_position)

Then, we define the evaluation function. For each target, an agent takes multiple steps to steer to it. In each step, the agent selects an action based on the input information about the state, and the environment updates according to the agent’s action.

Fill in the missing code that calculates the reward.

def evaluation(agent, gain_factor=1):
    """
    Evaluates the agent's performance in the environment.

    Inputs:
    - agent (Agent): The agent to be evaluated.
    - gain_factor (float): A factor to scale the process gain. Default is 1.

    Outputs:
    - results (DataFrame): A DataFrame containing evaluation results with columns:
      - 'pos_x': List of x positions over episodes.
      - 'pos_y': List of y positions over episodes.
      - 'pos_r_end': List of final radial distances from origin over episodes.
      - 'target_x': List of target x positions.
      - 'target_y': List of target y positions.
      - 'target_r': List of target radial distances.
      - 'rewarded': List of binary rewards indicating if the target was reached and stopped.
      - 'state_input': List of state inputs recorded during the episodes.
    """
    set_seed(0)
    env = Env(arg)

    pos_x = []; pos_y = []; pos_r_end = []
    target_x = []; target_y = []; target_r = []
    rewarded = []; state_input_ = []

    for target_position in tqdm(target_positions):
        state = env.reset(target_position=target_position, gain=arg.process_gain_default * gain_factor)
        agent.obs_step.reset(env.gain)

        state_input = torch.cat([torch.zeros([1, 1, arg.OBS_DIM]), torch.zeros([1, 1, arg.ACTION_DIM]),
                                 env.target_position_obs.view(1, 1, -1)], dim=2)
        hidden_in = (torch.zeros(1, 1, agent.actor.RNN_SIZE), torch.zeros(1, 1, agent.actor.RNN_SIZE))

        state_inputs = []
        states = []

        for t in range(arg.EPISODE_LEN):
            # 1. Agent takes an action given the state-related input
            action, hidden_out = agent.select_action(state_input, hidden_in)

            # 2. Environment updates to the next state given state and action,
            #    as well as checking if the agent has reached the reward zone,
            #    and if the agent has stopped.
            next_state, reached_target = env(state, action, t)
            is_stop = env.is_stop(action)

            # 3. Receive reward
            ###################################################################
            ## Fill out the following then remove
            raise NotImplementedError("Student exercise: compute the reward.")
            ###################################################################
            # TODO: Compute the reward. The reward is '1' when the target is reached and the agent stops on it,
            # otherwise, the reward is '0'.
            # Hint: Use variables 'reached_target' and 'is_stop'.
            reward = ...

            # 4. Agent observes the next state and constructs the next state-related input
            next_observation = agent.obs_step(next_state)
            next_state_input = torch.cat([next_observation.view(1, 1, -1), action,
                                          env.target_position_obs.view(1, 1, -1)], dim=2)

            states.append(state)
            state_inputs.append(state_input)

            state_input = next_state_input
            state = next_state
            hidden_in = hidden_out

            # trial is done when the agent stops
            if is_stop:
                break

        # store data for each trial
        pos_x_, pos_y_, _, _, _ = torch.chunk(torch.cat(states, dim=1), state.shape[0], dim=0)
        pos_x.append(pos_x_.view(-1).numpy() * arg.LINEAR_SCALE)
        pos_y.append(pos_y_.view(-1).numpy() * arg.LINEAR_SCALE)

        pos_r, _ = cart2pol(pos_x[-1], pos_y[-1])
        pos_r_end.append(pos_r[-1])

        target_x.append(target_position[0].item() * arg.LINEAR_SCALE)
        target_y.append(target_position[1].item() * arg.LINEAR_SCALE)
        target_r_, _ = cart2pol(target_x[-1], target_y[-1])
        target_r.append(target_r_)

        state_input_.append(torch.cat(state_inputs))
        rewarded.append(reward.item())

    return(pd.DataFrame().assign(pos_x=pos_x, pos_y=pos_y,
                                 pos_r_end=pos_r_end, target_x=target_x, target_y=target_y,
                                 target_r=target_r, rewarded=rewarded,
                                 state_input=state_input_))

Since training RL agents takes a lot of time, here we load the pre-trained modular and holistic agents and evaluate these two agents on the same sampled 1,000 targets. We will then store the evaluation data in pandas Dataframe object.

modular_agent = Agent(arg, ModularActor)
modular_agent.load(Path('agents/modular'), 0)
modular_df = evaluation(modular_agent)
holistic_agent = Agent(arg, HolisticActor)
holistic_agent.load(Path('agents/holistic'), 0)
holistic_df = evaluation(holistic_agent)

Coding Exercise 4: Agent trajectory in a single trial

After evaluation, we want to visualize the agents’ steering trajectories. Fill in the missing code to compute the distance between the stop location and the target location. Given the reward boundary is 65 cm, are these trials rewarded?

Modular agent trajectory

with plt.xkcd():
    trial_idx = 21
    trial = modular_df.iloc[trial_idx]

    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(111)

    ax.set_aspect('equal')
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])

    # plot trajectory
    px = trial.pos_x; py = trial.pos_y
    ax.plot(px, py, lw=lw, c=modular_c)

    # plot target
    target_x = trial.target_x; target_y = trial.target_y
    print(f'Target distance from the start location: {np.around(trial.target_r, 1)} cm')

    ###################################################################
    ## Fill out the following then remove
    raise NotImplementedError("Student exercise: calculate the distance between target location and stop position.")
    ###################################################################
    # Given target locations as trial.target_x and trial.target_y,
    # and stop locations as trial.pos_x[-1] and trial.pos_y[-1],
    # compute the Euclidean distance between the target and stop locations.
    distance_stoploc_to_target = ...
    print(f'Target distance from the stop location: {np.around(distance_stoploc_to_target, 1)} cm')

    print(f'Steps taken: {px.size - 1}')

    reward_boundary_radius = arg.goal_radius * arg.LINEAR_SCALE
    target_color = reward_c if distance_stoploc_to_target < reward_boundary_radius else unreward_c

    cir1 = Circle(xy=[target_x, target_y], radius=reward_boundary_radius, alpha=0.4, color=target_color, lw=0)
    ax.add_patch(cir1)
    ax.scatter(target_x, target_y, c=target_color, s=5)

    # plot initial position
    ax.scatter(0, 0, c='k', s=20, marker='*')
    ax.text(10, -10, s='Start', fontsize=fontsize)

    fig.tight_layout(pad=0)

Click for solution

Example output:

Solution hint

Holistic agent trajectory

Holistic agent trajectory

Source
# @title Holistic agent trajectory

with plt.xkcd():
    trial_idx = 21
    trial = holistic_df.iloc[trial_idx]

    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(111)

    ax.set_aspect('equal')
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])

    # plot trajectory
    px = trial.pos_x; py = trial.pos_y
    ax.plot(px, py, lw=lw, c=holistic_c)

    # plot target
    target_x = trial.target_x; target_y = trial.target_y
    print(f'Target distance from the start location: {np.around(trial.target_r, 1)} cm')

    distance_stoploc_to_target = np.sqrt((trial.target_x - trial.pos_x[-1])**2
                                         + (trial.target_y - trial.pos_y[-1])**2)
    print(f'Target distance from the stop location: {np.around(distance_stoploc_to_target, 1)} cm')

    print(f'Steps taken: {px.size - 1}')

    reward_boundary_radius = arg.goal_radius * arg.LINEAR_SCALE
    target_color = reward_c if distance_stoploc_to_target < reward_boundary_radius else unreward_c

    cir1 = Circle(xy=[target_x, target_y], radius=reward_boundary_radius, alpha=0.4, color=target_color, lw=0)
    ax.add_patch(cir1)
    ax.scatter(target_x, target_y, c=target_color, s=5)

    # plot initial position
    ax.scatter(0, 0, c='k', s=20, marker='*')
    ax.text(10, -10, s='Start', fontsize=fontsize)

    fig.tight_layout(pad=0)

Coding Exercise 4 Discussion

  1. Is there any difference between the trajectories for the modular and holistic agents? If so, what does it imply?

Activity: Comparing performance of agents

Agent trajectories across trials

We can also visualize multiple trials together. Here, we visualize trajectories steering towards 500 targets.

Modular agent’s trajectories

Source
# @title Modular agent's trajectories

target_idexes = np.arange(0, 500)
df = modular_df

fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
ax = fig.add_subplot(111)
ax.set_title("Modular agent's trajectories", fontsize=fontsize, fontweight='bold')
ax.set_aspect('equal')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])
ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])

ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,
        c='k', ls=(0, (1, 1)), lw=lw)
for _, trial in df.iloc[target_idexes].iterrows():
    ax.plot(trial.pos_x, trial.pos_y, c=modular_c, lw=0.3, ls='-', alpha=0.2)

reward_idexes = df.rewarded.iloc[target_idexes].values
for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],
                          [reward_c, unreward_c]):
    ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,
               c=c, marker='o', s=1, lw=1.5, label=label)

x_temp = np.linspace(-235, 235)
ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)
ax.text(-10, 425, s=r'70$\degree$', fontsize=fontsize)
ax.text(130, 150, s=r'400 cm', fontsize=fontsize)

ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)
ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)
ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)
ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)

ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],
          handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)

fig.tight_layout(pad=0)

Holistic agent’s trajectories

Source
# @title Holistic agent's trajectories

target_idexes = np.arange(0, 500)
df = holistic_df

fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
ax = fig.add_subplot(111)
ax.set_title("Holistic agent's trajectories", fontsize=fontsize, fontweight='bold')
ax.set_aspect('equal')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])
ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])

ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,
        c='k', ls=(0, (1, 1)), lw=lw)
for _, trial in df.iloc[target_idexes].iterrows():
    ax.plot(trial.pos_x, trial.pos_y, c=holistic_c, lw=0.3, ls='-', alpha=0.2)

reward_idexes = df.rewarded.iloc[target_idexes].values
for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],
                          [reward_c, unreward_c]):
    ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,
               c=c, marker='o', s=1, lw=1.5, label=label)

x_temp = np.linspace(-235, 235)
ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)
ax.text(-10, 425, s=r'70$\degree$', fontsize=fontsize)
ax.text(130, 150, s=r'400 cm', fontsize=fontsize)

ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)
ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)
ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)
ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)

ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],
          handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)

fig.tight_layout(pad=0)

Discussion

  1. Is there any difference between the two agents’ trajectories?

Agents trained with multiple random seeds

It is well known that an RL agent’s performance can vary significantly with different random seeds. Therefore, no conclusions can be drawn based on one training run with a single random seed. Therefore, to make more convincing conclusions, we must run the same experiment across different random initializations in order to be sure that any repeatedly-obtainable result is robustly seen across such different random initializations.

Both agents were trained across 8 random seeds. All of them were evaluated using the same sample of 1,000 targets.

Let’s load this saved trajectory data.

Load agents

Source
# @title Load agents
with open('modular_dfs.pkl', 'rb') as file:
    modular_dfs = pickle.load(file)

with open('holistic_dfs.pkl', 'rb') as file:
    holistic_dfs = pickle.load(file)

Reward fraction comparison

We first compute the fraction of rewarded trials in the total 1,000 trials for all training runs with different random seeds for the modular and holistic agents. We visualize this using a bar plot, with each red dot denoting the performance of a random seed.

Reward function comparison

Source
# @title Reward function comparison

with plt.xkcd():
    xticks = [0, 1]; xticklabels = ['Modular', 'Holistic']
    yticks = [0.9, 0.95, 1]

    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(1, 1, 1)
    ax.set_title('Reward fraction comparison', fontsize=fontsize, fontweight='bold')
    ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)
    plt.xticks(xticks, xticklabels, fontsize=fontsize)
    plt.yticks(yticks, fontsize=fontsize)
    ax.set_xlabel(r'', fontsize=fontsize + 1)
    ax.set_ylabel('Reward fraction', fontsize=fontsize + 1)
    ax.set_xlim(xticks[0] - 0.3, xticks[-1] + 0.3)
    ax.set_ylim(yticks[0], yticks[-1])
    ax.xaxis.set_label_coords(0.5, -0.15)
    ax.yaxis.set_label_coords(-0.13, 0.5)
    ax.yaxis.set_major_formatter(major_formatter)

    for (idx, dfs), c in zip(enumerate([modular_dfs, holistic_dfs]), [modular_c, holistic_c]):
        ydata = [df.rewarded.sum() / len(df) for df in dfs]
        ax.bar(idx, np.mean(ydata), width=0.7, color=c, alpha=1, zorder=0)
        ax.scatter([idx] * len(ydata), ydata, c='crimson', marker='.',
                   s=10, lw=0.5, zorder=1, clip_on=False)

    fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))
<Figure size 440x340 with 1 Axes>

Time spent comparison

Despite similar performance measured by a rewarded fraction, we did observe qualitative differences in the trajectories of the two agents in the previous sections. It is possible that the holistic agent’s more curved trajectories, although reaching the target, are less efficient, i.e., they waste more time.

Therefore, we also plot the time spent by both agents for the same 1,000 targets.

Time spent comparison

Source
# @title Time spent comparison

with plt.xkcd():
    xticks = [0, 1]; xticklabels = ['Modular', 'Holistic']
    yticks = [0.35, 0.45, 0.55]

    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(1, 1, 1)
    ax.set_title('Time spent comparison', fontsize=fontsize, fontweight='bold')
    ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)
    plt.xticks(xticks, xticklabels, fontsize=fontsize)
    plt.yticks(yticks, fontsize=fontsize)
    ax.set_xlabel(r'', fontsize=fontsize + 1)
    ax.set_ylabel('Time spent (h)', fontsize=fontsize + 1)
    ax.set_xlim(xticks[0] - 0.3, xticks[-1] + 0.3)
    ax.set_ylim(yticks[0], yticks[-1])
    ax.xaxis.set_label_coords(0.5, -0.15)
    ax.yaxis.set_label_coords(-0.16, 0.5)
    ax.yaxis.set_major_formatter(major_formatter)

    for (idx, dfs), c in zip(enumerate([modular_dfs, holistic_dfs]), [modular_c, holistic_c]):
        ydata = [(np.hstack(df.pos_x).size - len(df)) * arg.DT / 3600 for df in dfs]
        ax.bar(idx, np.mean(ydata), width=0.7, color=c, alpha=1, zorder=0)
        ax.scatter([idx] * len(ydata), ydata, c='crimson', marker='.',
                   s=10, lw=0.5, zorder=1, clip_on=False)

    fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))
<Figure size 440x340 with 1 Axes>

Discussion

  1. Which agent’s behavior is more desired in the RL framework? Why?

Hint: Consider the objective functions for the critic and actor. The discount factor γ\gamma used in training is smaller than 1.

Training curve

So far, we have only tested the agents after training. We can also visualize the performance curve for both agents during the training course.

During training, for every 500 training trials, the agent’s performance (fraction of rewarded trials) was evaluated. These data were saved; we’ll load them back in for visualization.

Load training curves

Source
# @title Load training curves

training_curve_path = Path('training_curve')
holistic_curves = [pd.read_csv(file) for file in training_curve_path.glob(f'holistic*')]
modular_curves = [pd.read_csv(file) for file in training_curve_path.glob(f'modular*')]

Let us plot the training curves for both agents. The shaded area denotes the standard error of the mean across all random seeds.

Visualize training curves

Source
# @title Visualize training curves

mean_holistic_curves = np.vstack([v.reward_fraction for v in holistic_curves]).mean(axis=0)
sem_holistic_curves = sem(np.vstack([v.reward_fraction for v in holistic_curves]), axis=0)
mean_modular_curves = np.vstack([v.reward_fraction for v in modular_curves]).mean(axis=0)
sem_modular_curves = sem(np.vstack([v.reward_fraction for v in holistic_curves]), axis=0)

xaxis_scale = int(1e4)
yticks = np.around(np.linspace(0, 1, 6), 1)
xticks = np.around(np.linspace(0, 1.5e5, 4), 1)
xticklabels = [my_tickformatter(i, None) for i in xticks / xaxis_scale]

with plt.xkcd():
    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(1, 1, 1)
    ax.set_title('Reward fraction during training', fontsize=fontsize, fontweight='bold')
    ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)
    plt.xticks(xticks, xticklabels, fontsize=fontsize)
    plt.yticks(yticks, fontsize=fontsize)
    ax.set_xlabel(r'Training trial ($\times$10$^4$)', fontsize=fontsize + 1)
    ax.set_ylabel('Reward fraction', fontsize=fontsize + 1)
    ax.set_xlim(xticks[0], xticks[-1])
    ax.set_ylim(yticks[0], yticks[-1])
    ax.xaxis.set_label_coords(0.5, -0.3)
    ax.yaxis.set_label_coords(-0.13, 0.5)
    ax.yaxis.set_major_formatter(major_formatter)

    xdata = holistic_curves[0].episode.values
    for ymean, ysem, color, label in zip([mean_holistic_curves, mean_modular_curves],
                                         [sem_holistic_curves, sem_modular_curves],
                                         [holistic_c, modular_c],
                                         ['Holistic agent', 'Modular agent']):
        ax.plot(xdata, ymean, lw=lw, clip_on=False, c=color, label=label)
        ax.fill_between(xdata, ymean - ysem, ymean + ysem, edgecolor='None',
                        facecolor=color, alpha=0.4, clip_on=True)

    ax.legend(fontsize=fontsize, frameon=False, loc=[0.26, 0.1],
              handletextpad=0.5, labelspacing=0.2, ncol=1, columnspacing=1)

    fig.tight_layout(pad=0.3, w_pad=0.5, rect=(0, 0, 1, 1))
<Figure size 440x340 with 1 Axes>

Conclusion for the training task: The modular agent learned faster and developed more efficient behaviors than the holistic agent.


Section 3: A novel gain task

Estimated timing to here from start of tutorial: 50 minutes

The prior task had a fixed joystick gain that meant consistent linear and angular velocities. We will now look at a novel task that tests the generalization capabilities of these models by varying this setting between training and testing. Will the model generalize well?

Video 7: Novel Task

Youtube
Bilibili

A good architecture should not only learn fast but also generalize better.

One crucial parameter in our task is the joystick gain, which linearly maps motor actions (dimensionless, bounded in [1,1][-1,1]) to corresponding velocities in the environment, i.e., velocity=gain \cdot action (see the environment dynamics above). During training, the gain remains fixed at 200 cm/s and 9090^{\circ}/s for linear and angular components, referred to as the 1×1\times gain.

To test agents’ generalization abilities, here, we increase the gain to 2×2\times. This means the maximum velocities in the environment are doubled to 400 cm/s and 180180^{\circ}/s. Note that agents were only exposed to the 1×1\times gain during training. Therefore, if they use the same sequence of actions as in training with the 2×2\times gain, they will overshoot targets.

Let us evaluate modular and holistic agents with the same 1000 sampled targets again, but now with the 2×2\times gain.

Change in gain
gain_factor = 2

Load holistic & modular agents

Source
# @title Load holistic & modular agents
modular_agent = Agent(arg, ModularActor)
modular_agent.load(Path('agents/modular'), 0)
modular_dfs_2x = evaluation(modular_agent, gain_factor=gain_factor)

holistic_agent = Agent(arg, HolisticActor)
holistic_agent.load(Path('agents/holistic'), 0)
holistic_dfs_2x = evaluation(holistic_agent, gain_factor=gain_factor)

Let us first visualize the modular agents’ trajectories with 2×2\times gain.

Modular agent’s trajectories

Source
# @title Modular agent's trajectories

target_idexes = np.arange(0, 500)
df = modular_dfs_2x

fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
ax = fig.add_subplot(111)
ax.set_title("Modular agent's trajectories\nwith " +
             r'$2\times$gain', fontsize=fontsize, fontweight='bold')
ax.set_aspect('equal')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])
ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])

ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,
        c='k', ls=(0, (1, 1)), lw=lw)
for _, trial in df.iloc[target_idexes].iterrows():
    ax.plot(trial.pos_x, trial.pos_y, c=modular_c, lw=0.3, ls='-', alpha=0.2)

reward_idexes = df.rewarded.iloc[target_idexes].values
for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],
                          [reward_c, unreward_c]):
    ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,
               c=c, marker='o', s=1, lw=1.5, label=label)

x_temp = np.linspace(-235, 235)
ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)
ax.text(-10, 425, s=r'70$\degree$', fontsize=fontsize)
ax.text(130, 150, s=r'400 cm', fontsize=fontsize)

ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)
ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)
ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)
ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)

ax.text(-210, 30, s="Modular agent's\ntrajectories", fontsize=fontsize - 0.5, c=modular_c)

ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],
          handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)

fig.tight_layout(pad=0)

Let us now visualize the holistic agents’ trajectories with 2×2\times gain.

Holistic agent’s trajectories

Source
# @title Holistic agent's trajectories

target_idexes = np.arange(0, 500)
df = holistic_dfs_2x

fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
ax = fig.add_subplot(111)
ax.set_title("Holistic agent's trajectories\nwith " +
             r'$2\times$gain', fontsize=fontsize, fontweight='bold')
ax.set_aspect('equal')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])
ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])

ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,
        c='k', ls=(0, (1, 1)), lw=lw)
for _, trial in df.iloc[target_idexes].iterrows():
    ax.plot(trial.pos_x, trial.pos_y, c=holistic_c, lw=0.3, ls='-', alpha=0.2)

reward_idexes = df.rewarded.iloc[target_idexes].values
for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],
                          [reward_c, unreward_c]):
    ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,
               c=c, marker='o', s=1, lw=1.5, label=label)


x_temp = np.linspace(-235, 235)
ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)
ax.text(-10, 425, s=r'70$\degree$', fontsize=fontsize)
ax.text(130, 150, s=r'400 cm', fontsize=fontsize)

ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)
ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)
ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)
ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)

ax.text(-210, 30, s="Holistic agent's\ntrajectories", fontsize=fontsize - 0.5, c=holistic_c)

ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],
          handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)

fig.tight_layout(pad=0)

Discussion

  1. Do the two agents have the same generalization abilities?

Again, let us load the saved evaluation data with the 2×2\times gain for agents with all 8 random seeds.

Load data

Source
# @title Load data
with open('modular_dfs_2x.pkl', 'rb') as file:
    modular_dfs_2x = pickle.load(file)

with open('holistic_dfs_2x.pkl', 'rb') as file:
    holistic_dfs_2x = pickle.load(file)

Now, let’s compare reward fraction with 2×2\times gain.

Reward function comparison

Source
# @title Reward function comparison

with plt.xkcd():
    xticks = [0, 1]; xticklabels = ['Modular', 'Holistic']
    yticks = [0, 0.5, 1]

    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(111)
    ax.set_title("Reward fraction comparison\nwith " +
                 r'$2\times$gain', fontsize=fontsize, fontweight='bold')
    ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)
    plt.xticks(xticks, xticklabels, fontsize=fontsize)
    plt.yticks(yticks, fontsize=fontsize)
    ax.set_xlabel(r'', fontsize=fontsize + 1)
    ax.set_ylabel('Reward fraction', fontsize=fontsize + 1)
    ax.set_xlim(xticks[0] - 0.3, xticks[-1] + 0.3)
    ax.set_ylim(yticks[0], yticks[-1])
    ax.xaxis.set_label_coords(0.5, -0.15)
    ax.yaxis.set_label_coords(-0.13, 0.5)
    ax.yaxis.set_major_formatter(major_formatter)

    for (idx, dfs), c in zip(enumerate([modular_dfs_2x, holistic_dfs_2x]), [modular_c, holistic_c]):
        ydata = [df.rewarded.sum() / len(df) for df in dfs]
        ax.bar(idx, np.mean(ydata), width=0.7, color=c, alpha=1, zorder=0)
        ax.scatter([idx] * len(ydata), ydata, c='crimson', marker='.',
                   s=10, lw=0.5, zorder=1, clip_on=False)

    fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))
<Figure size 440x340 with 1 Axes>

Let us also compare traveled distance vs target distance with 2×2\times gain for different agents.

Traveled distance comparison

Source
# @title Traveled distance comparison
with plt.xkcd():
    xticks = np.array([0, 250, 500])
    yticks = xticks

    fig = plt.figure(figsize=(2, 2), dpi=200)
    ax = fig.add_subplot(111)
    ax.set_title("Traveled distance comparison\nwith " +
                 r'$2\times$gain', fontsize=fontsize, fontweight='bold')
    ax.set_aspect('equal')
    ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)
    plt.xticks(xticks, fontsize=fontsize)
    plt.yticks(yticks, fontsize=fontsize)
    ax.set_xlabel('Target distance (cm)', fontsize=fontsize + 1)
    ax.set_ylabel('Traveled distance (cm)', fontsize=fontsize + 1)
    ax.set_xlim(xticks[0], xticks[-1])
    ax.set_ylim(yticks[0], yticks[-1])
    ax.xaxis.set_label_coords(0.5, -0.25)
    ax.yaxis.set_label_coords(-0.25, 0.5)
    ax.yaxis.set_major_formatter(major_formatter)

    xdata_m = np.hstack([df.target_r for df in modular_dfs_2x])
    ydata_m = np.hstack([df.pos_r_end for df in modular_dfs_2x])
    xdata_h = np.hstack([df.target_r for df in holistic_dfs_2x])
    ydata_h = np.hstack([df.pos_r_end for df in holistic_dfs_2x])

    sampled_trial_idx = np.random.choice(np.arange(ydata_m.size), size=1000, replace=False)

    ax.scatter(xdata_m[sampled_trial_idx], ydata_m[sampled_trial_idx], c=modular_c, s=1, alpha=0.1)
    ax.scatter(xdata_h[sampled_trial_idx], ydata_h[sampled_trial_idx], c=holistic_c, s=1, alpha=0.1)

    model = LinearRegression(fit_intercept=False)
    model.fit(xdata_m.reshape(-1, 1), ydata_m)
    ax.plot(xticks, model.predict(xticks.reshape(-1, 1)), c=modular_c,
            ls='-', lw=lw*2, label='Modular')

    model = LinearRegression(fit_intercept=False)
    model.fit(xdata_h.reshape(-1, 1), ydata_h)
    ax.plot(xticks, model.predict(xticks.reshape(-1, 1)), c=holistic_c,
            ls='-', lw=lw*2, label='Holistic')

    ax.plot(xticks, yticks, c='k', ls='--', lw=lw)

    ax.legend(fontsize=fontsize, frameon=False, loc=[0.45, 0.1],
              handletextpad=0.5, labelspacing=0.2, ncol=1, columnspacing=1)

    fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))
<Figure size 400x400 with 1 Axes>

Discussion:

  1. What have you found? Which agent generalized better?

Decoding analysis

Although we have confirmed that the modular agent exhibited better generalization performance in the gain task, the underlying mechanism remains unclear. One reasonable hypothesis is that the modular agent has a robust belief that remains accurate with the 2×2\times gain. Therefore, this robust belief can support good actions. Ultimately, agents should be aware of its location to avoid overshooting.

To test this hypothesis, we recorded the activities of RNN neurons in the agents’ actors, as these neurons are responsible for computing the beliefs underlying actions. Since beliefs should represent the agents’ locations in the environment, we used linear regression (with 2\ell_2 regularization) to decode agents’ locations from the recorded RNN activities.

We defined the decoding error, representing the Euclidean distance between the true and decoded locations, as an indicator of belief accuracy.

Load data

Source
# @title Load data
modular_decoder_data = [fit_decoder(df) for df in modular_dfs]
modular_decoder_data_2x = [fit_decoder(df) for df in modular_dfs_2x]
modular_decoding_error = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v
                                    in modular_decoder_data])
modular_decoding_error_2x = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v
                                       in modular_decoder_data_2x])

holistic_decoder_data = [fit_decoder(df) for df in holistic_dfs]
holistic_decoder_data_2x = [fit_decoder(df) for df in holistic_dfs_2x]
holistic_decoding_error = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v
                                     in holistic_decoder_data])
holistic_decoding_error_2x = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v
                                        in holistic_decoder_data_2x])

We plot the distribution of the decoding error for every step in every trial across random seeds for each agent, under 1× or 2× gain.

Decoding error comparison

Source
# @title Decoding error comparison

with plt.xkcd():
    xticks = np.array([0, 1]); xticklabels = ['1x (train)', '2x (test)']
    yticks = np.linspace(0, 120, 5)

    fig = plt.figure(figsize=(2.2, 1.7), dpi=200)
    ax = fig.add_subplot(111)
    ax.set_title('Decoding error comparison', fontsize=fontsize, fontweight='bold')
    ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)
    plt.xticks(xticks, xticklabels, fontsize=fontsize)
    plt.yticks(yticks, fontsize=fontsize)
    ax.set_xlabel(r'Gain', fontsize=fontsize + 1)
    ax.set_ylabel('Decoding error (cm)', fontsize=fontsize + 1)
    ax.set_xlim(xticks[0] - 0.2, xticks[-1] + 0.2)
    ax.set_ylim(yticks[0], yticks[-1])
    ax.xaxis.set_label_coords(0.5, -0.15)
    ax.yaxis.set_label_coords(-0.2, 0.5)
    ax.yaxis.set_major_formatter(major_formatter)

    violin1 = ax.violinplot(filter_fliers([modular_decoding_error, modular_decoding_error_2x]),
                            positions=xticks - 0.1, showmeans=True, widths=0.1)
    set_violin_plot(violin1, facecolor=modular_c, edgecolor=modular_c)

    violin2 = ax.violinplot(filter_fliers([holistic_decoding_error, holistic_decoding_error_2x]),
                            positions=xticks + 0.1, showmeans=True, widths=0.1)
    set_violin_plot(violin2, facecolor=holistic_c, edgecolor=holistic_c)

    ax.legend([violin1['bodies'][0], violin2['bodies'][0]], ['Modular', 'Holistic'],
              fontsize=fontsize, frameon=False, loc=[0.3, 0.7],
              handletextpad=0.5, labelspacing=0.2, ncol=1, columnspacing=1)

    fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))
<Figure size 440x340 with 1 Axes>

While both agents demonstrated small decoding errors with the training gain, we observed that the holistic agent, which struggled with generalization at the 2× gain, also exhibited reduced accuracy in determining its own location. This helps explain why the modular agent can generalize better at higher gains.

Conclusion for the gain task: In the gain task, the modular agent exhibited better generalization than the holistic agent, supported by a more accurate internal belief when faced with larger gains.


The Big Picture

Estimated timing of tutorial: 1 hour

In this tutorial, we explored the difference in agents’ performance based on their architecture. We revealed that modular architecture, with separate modules for learning different aspects of behavior, is superior to a holistic architecture with a single module. The modular architecture with stronger inductive bias achieves good performance faster and has the capability to generalize to other tasks as well. Intriguingly, this modularity is a property we also observe in the brains, which could be important for generalization in the brain as well.


Addendum: Generalization, but no free lunch

The No Free Lunch theorems proved that no inductive bias can excel across all tasks. It has been studied in the paper that agents with a modular architecture can acquire the underlying structure of the training task. In contrast, holistic agents tend to acquire different knowledge than modular agents during training, such as forming beliefs based on unreliable information sources or exhibiting less efficient control actions. The novel gain task has a structure similar to the training task, consequently, a modular agent that accurately learns the training task’s structure can leverage its knowledge in these novel tasks.

However, it is worth noting that an infinite number of new tasks can be constructed, diverging from the training task’s structure but aligning with the ‘inferior’ beliefs and control acquired by holistic agents.

For example, although we have seen that the holistic agent developed inefficient trajectories with higher curvature, which is detrimental for the training task, if we consider a new task with a different reward function, such as preferring more curved trajectories, the holistic agent should perform better in this particular task than the modular agent. There is no free lunch.

References
  1. Zhang, R., Pitkow, X., & Angelaki, D. E. (2024). Inductive biases of neural network modularity in spatial navigation. Science Advances, 10(29). 10.1126/sciadv.adk1256
  2. Zhang, R., Pitkow, X., & Angelaki, D. E. (2024). Inductive biases of neural network modularity in spatial navigation. Science Advances, 10(29). 10.1126/sciadv.adk1256