Open-Source Internship opportunity by OpenGenus for programmers. Apply now.
Augmented random search (ARS)is a model-free reinforcement learning, and a modified basic random search (BRS) algorithm, the algorithm was first published in 2018 by the trio - Horia Mania, Aurelia Guy, and Benjamin Recht from the University of California, Berkeley. When ARS is compared with other AI algorithms, it is 15 times faster and more efficiecnt, returning higher rewards when applied to a specific application, like the locomotion task in MuJoCo (Multi-Joint Dynamics with Contact) environment; hence, this makes it a jewel of an algorithm.
The basic random search algorithm uses a stochastic process. The stochastic process depends on the collection of random variables, which are arranged or indexed by a mathematical set. These variables are the observed values of the random variables in the course if the system process.
The Idea:
The basic idea behind every reinforcement learning algorithm is to formulate an algorithm that will find the best policy that is capable of maximizing the reward an agent while applying that particular policy in the environment during an episode. When we maximize the reward, it means we are actually optimizing the process β we are trying to find the best value(s) that returns the best result or the maximum objective function.
Augmented verse Basic Random Search:
Basic Random Search: This random search does its exploration in the action space; what this means is that, for every action taken, and a reward returned, an analysis is drawn based on that single action. It employs gradient descent algorithm, and uses deep learning.
Augmented Random Search (ARS): The ARS does its exploration in the policy space, meaning that is focuses its analysis on the whole action taken that resulted into positive rewards, so as to establish and reinforce same policy afterwards. In the policy space, every action that yielded the lowest rewards are discarded. It employs method of finite difference β This is the difference we get between two rewards that are in opposite direction when we apply small perturbations (little random values) to the weighted inputs. Equation for method of finite difference: Ζ β²(a) β (Ζ(a+h)-f(a))/h
Finally, it uses Shallow learning as its model, the perceptron does the logic.
In the ARS setup , the agent learns from the environment, gets a reward based on its performance in the environment, then adjust the weighted input relative to the reward it got; by this, the agent tries not to repeat it previous mistake, rather takes subsequent actions geared toward the actions that brought those favourable rewards, and it does this with the help of the perceptron. The environment gives a reward for every agent movement/action, while the perceptron chooses the aggregate reward to enable it adjust the weighted inputs.
The Perceptron:
class Policy():
def __init__(self, input_size, output_size):
self.theta = np.zeros((output_size, input_size))
def evaluate(self, input, delta = None, direction = None):
if direction is None:
return self.theta.dot(input)
elif direction == "positive":
return (self.theta + hp.noise*delta).dot(input)
else:
return (self.theta - hp.noise*delta).dot(input)
Note: The whole input (from the environment) to the perceptron can be described as a vector of inputs- vector of numbers, while output is much of the sum of the weighted input.
The Perceptron β takes input from the environment, apply some weights to them, sum them up and divides them with the standard deviation, returns the output back to the environment.
Rollouts:
This is the code that performs the rollouts.
all_rewards = np.array(positive_rewards + negative_rewards)
sigma_r = all_rewards.std()
Here all the positive/negative rewards are gathered to compute the standard deviation of these rewards.
scores = {k:max(r_pos,r_neg) for k,(r_pos,r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
order = sorted(scores.keys(), key = lambda x:scores[x])[:hp.nb_best_directions]
rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
What happens in the rollout phase is this: We needed to get all the rewards we get by applying the perturbations in the +ve best direction and in the -ve/opposite best directions, and also the perturbations, ie reward in the positive directions, reward in the negative directions, and d = delta.
Note: the reason we need this perturbations, is because, in the next step of the program, we will be updating the policy, which will be require these perturbations for each of the best directions.
The next thing is to update the policy.
Updating our policy
policy.update(rollouts, sigma_r)
From here we move to printing out the final rewards after updating the policy.
Printing the final reward of the policy after the update
reward_evaluation = explore(env, normalizer, policy)
print("step:", step,"rewards:" ,reward_evaluation)
At this point, the whole ARS algorithm is done
So in general, what happens in ARS is as follows:
-
We get N inputs vectors
-
The input vectors are normalized - Normalization here means trying perturbations that will be fair to input vectors without having too much and drastic impact on the output vectors.
-
The vectors are weighted and a matrix is formed which will be used to run an episode.
-
The dimension of the matrix is gotten by employing the method β self.theta.shape
-
The perceptron uses this weighted inputs, with some added random perturbations in positive and negative directions to give an output.
-
The actions returns a reward that is either reinforced or discarded.
-
A new state is reached based on the initial action and reward.
-
The best reward in the best directions are aggregated and reinforced - The bigger the reward from a specific weight configuration, the bigger its influence on the adjustment of the weights.
Here is the standard ARS Algorithm that you would likely see online:
Importing the libraries
import os
import numpy as np
import gym
from gym import wrappers
import pybullet_envs
Setting hyper parameters
class Hp():
def __init__(self):
self.nb_steps = 1000
self.episode_length = 1000
self.learning_rate = 0.02
self.nb_directions = 16
self.nb_best_directions = 16
assert self.nb_best_directions<=self.nb_directions
self.noise = 0.03
self.seed = 1
self.env_name = "HalfCheetahBulletEnv-v0"
Normalizing the states
class Normalizer():
def __init__(self, nb_inputs):
self.n = np.zeros(nb_inputs)
self.mean = np.zeros(nb_inputs)
self.mean_diff= np.zeros(nb_inputs)
self.var = np.zeros(nb_inputs)
def observe(self, x):
self.n += 1.
self.last_mean = self.mean.copy()
self.mean += (x - self.mean)/self.n
self.mean_diff += (x - self.last_mean)*(x + self.mean)
self.var = (self.mean_diff/self.n).clip(min= 1e-02)
def normalize(self, inputs):
obs_mean = self.mean
obs_std = np.sqrt(self.var)
return (inputs - obs_mean)/obs_std
# Building the AI
class Policy():
def __init__(self, input_size, output_size):
self.theta = np.zeros((output_size, input_size))
def evaluate(self, input, delta = None, direction = None):
if direction is None:
return self.theta.dot(input)
elif direction == "positive":
return (self.theta + hp.noise*delta).dot(input)
else:
return (self.theta - hp.noise*delta).dot(input)
def sample_deltas(self):
return [np.random.randn(*self.theta.shape) for _ in range (hp.nb_directions)]
def update(self, rollouts, sigma_r): # sigma_r = stndrd deviatn of the rewards
step = np.zeros(self.theta.shape)
for r_pos, r_neg, d in (rollouts):
step += (r_pos - r_neg)*d
self.theta += hp.learning_rate/(hp.nb_best_directions* sigma_r)* step
# learning rate here determines how much of a weight we need to add to/minus fro current weight, based on the reward gotten from the previous action.
# Exploring the policy on one specific direction and over one episode
def explore(env, normalizer, policy, direction = None, delta = None):
state = env.reset()
done = False
num_plays = 0.
sum_rewards = 0
while not done and num_plays < hp.episode_length:
normalizer.observe(state)
state = normalizer.normalize(state)
action = policy.evaluate(state, delta, direction)
state, reward, done, _ = env.step(action)
reward = max(min(reward, 1), -1) # we try to clip the rewards to fall within the range of 1 and -1 that is, the max rewrd should fall within 1 the min reward should fall within -1
sum_rewards += reward
num_plays += 1
return sum_rewards
# Training the AI
def train(env, policy, normalizer,hp):
for step in range(hp.nb_steps):
# initializing the perturbation deltas and the positve/nagetive rewards
deltas = policy.sample_deltas()
positive_rewards = [0] * hp.nb_directions
negative_rewards = [0] * hp.nb_directions
#getting the positive rewards in the positive directions
for k in range(hp.nb_directions):
positive_rewards[k] = explore(env, normalizer, policy, direction = "positive", delta = deltas[k])
# getting the negative rewards in the negative directions
for k in range(hp.nb_directions):
negative_rewards[k] = explore(env, normalizer, policy, direction = "negative", delta = deltas[k])
# gathering all the positive/negative rewards to compute the standard deviation of these rewards
all_rewards = np.array(positive_rewards + negative_rewards)
sigma_r = all_rewards.std()
# sorting the rollouts by the max(r_pos,r_neg) and selecting the best directions
# k here represents the keys - being the sixteen directions in the +ve or -ve rewards, r_pos/r_neg are
# the values in those specific drections
scores = {k:max(r_pos,r_neg) for k,(r_pos,r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
order = sorted(scores.keys(), key = lambda x:scores[x])[:hp.nb_best_directions] #scores sorted by the keys with maxRwd in all 16 best directions
rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
#what happended in the rollout phase is this: we needed to get all the rewards we get by applying the pertubations in the +ve best directions
# and in the -ve/opposite best directions, and also the perturbations, ie r_pos,r_neg, and d.
# Note: the reason we need this perturbations, is because, in the next step of our program, we will be updating our policy, which will be
# require r_pos,r_neg, and d(delta) for each of the best directions.
# updating our policy
policy.update(rollouts, sigma_r)
# printing the final reward of the policy after the update
reward_evaluation = explore(env, normalizer, policy)
print("step:", step,"rewards:" ,reward_evaluation)
# Running the main code
def mkdir(base, name):
path = os.path.join(base,name)
if not os.path.exists(path):
os.makedirs(path)
return path
work_dir = mkdir("exp","brs")
monitor_dir = mkdir(work_dir ,"monitor")
hp = Hp()
np.random.seed(hp.seed)
env = gym.make(hp.env_name)
env = wrappers.Monitor(env, monitor_dir, force = True)
nb_inputs = env.observation_space.shape[0]
nb_outputs = env.action_space.shape[0]
policy = Policy(nb_inputs, nb_outputs)
normalizer = Normalizer(nb_inputs)
train(env, policy, normalizer, hp)
With this article at OpenGenus, you must have the complete idea of Augmented Random Search (ARS).