DeepQ Tuning

Moving forward with Deep Q Learning as our model for the reinforcement learning agent, the team shifted efforts toward optimization of this model. As Deep Q utilizes images from the emulator environment as input to map button press actions to optimal rewards, we identified three areas in which to experiment: image input, actions, and reward structure.

The OpenAI Gym Retro emulator processes images at 60 frames per second (fps). However, we posited that this frame rate was faster than humans could process the image, make a decision, and press a button. Therefore, we did not need to map an input to each frame, in the emulator and could take advantage of a reduced frame rate in several ways. First, we modified our input to our Deep Q Agent to take 4 consecutive concatenated frames as input, which provides richer contextual information to the model such as direction of travel and momentum. Next, we applied stochastic frame skipping to these images prior to concatenation. This means that rather than seeing 4 directly consecutive frames, the environment may skip up to 4 frames before concatenating the next image. This effectively reduces our frame rate to 15 fps, which better aligns with human button press times.

According to research by Google's DeepMind, Deep Q Learning is optimized around relatively small, discrete action spaces. However, the Sega Genesis controller emulation contains 12 buttons, which can be pressed in 12^2 or 4,096 combinations. Therefore, we opted to reduce the action space to the most viable moves for Sonic to make in the environment. This eliminates redundant combinations such as pressing up and down at the same time. Our final action space is reduced to 7 basic actions: stand still, jump, walk right, jump right, walk left, jump left, and crouch/roll.

The OpenAI Gym Retro environment provides several reward functions as a baseline to inform the agent of the success of its previous move. These include raw “x position”, indicating how far right Sonic has moved in the environment and “contest” which calculates x position relative to environment length and adds a bonus for quickly completing a level. The team added two additional reward functions for experimentation. First, we added a modified version of “contest” which did not penalize backtracking, allowing Sonic to move left to gain momentum in support of overcoming tall obstacles. Additionally, we added a complex reward function which included relative x position, collecting rings, eliminating enemies, and penalized excessive jumping to prioritize forward momentum. Our most successful runs utilized the “backtracking” reward function.

Our Code:

/source/interface/wrappers.py

1import numpy as np
2import gym
3
4class StochasticFrameSkip(gym.Wrapper):
5  def __init__(self, env, n, stickprob):
6      gym.Wrapper.__init__(self, env)
7      self.n = n
8      self.stickprob = stickprob
9      self.curac = None
10      self.rng = np.random.RandomState()
11      self.supports_want_render = hasattr(env, "supports_want_render")
12
13  def reset(self, **kwargs):
14      self.curac = None
15      return self.env.reset(**kwargs)
16
17  def step(self, ac):
18      done = False
19      totrew = 0
20      for i in range(self.n):
21          # First step after reset, use action
22          if self.curac is None:
23              self.curac = ac
24          # First substep, delay with probability=stickprob
25          elif i==0:
26              if self.rng.rand() > self.stickprob:
27                  self.curac = ac
28          # Second substep, new action definitely kicks in
29          elif i==1:
30              self.curac = ac
31          if self.supports_want_render and i<self.n-1:
32              ob, rew, done, info = self.env.step(self.curac, want_render=False)
33          else:
34              ob, rew, done, info = self.env.step(self.curac)
35          totrew += rew
36          if done: break
37      return ob, totrew, done, info

/source/interface/action_space.py

1# Represents all possible moves.
2# Converts moves to button presses which can be used with env.step()
3# 
4# ex:
5#	# returns an array of 12 ints representing the button presses for this action
6#	buttons = ActionSpace.move_right()	
7#	
8#	env = retro.make(game='SonicTheHedgehog-Genesis', state='LabyrinthZone.Act1')
9#	env.step(buttons)	# sends move to game emulator
10#
11# ActionSpace is treated like a namespace. It is not intended to be instantiated.
12
13class ActionSpace:
14  # --------------------------------- FIELDS --------------------------------
15
16  # Index of each possible move.
17  # These values are constant and should not be change at runtime.
18  STAND_STILL = 0
19  RIGHT = 1
20  JUMP_RIGHT = 2
21  JUMP = 3
22  JUMP_LEFT = 4
23  LEFT = 5
24  CROUCH = 6
25  ROLL = CROUCH
26  # TODO: Do we need one for spin dash
27  
28  # look up table which maps action specified by an index [0, 7]
29  # to a combination of button presses.
30  # *** There is no way to control rolling left or right. ***
31  # *** Momentum determines direction of roll. ***
32  BUTTONS = [
33    # 0  1  2  3  4  5  6  7  8  9 10 11
34    # A  B  C     ^  v  <  >              
35    [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],	# 0 - stand still
36    [ 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0 ], # 1 - right
37    [ 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0 ], # 2 - jump right
38    [ 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], # 3 - jump		
39    [ 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0 ], # 4 - jump left
40    [ 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0 ], # 5 - left
41    [ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0 ], # 6 - crouch/roll
42    # TODO: Do we need spin dash?
43  ]
44
45  # --------------------------------- METHODS -------------------------------
46
47  # returns button presses which coorespond with standing still
48  def stand_still() -> list:
49    return ActionSpace.BUTTONS[ActionSpace.STAND_STILL]
50
51  # returns button presses which coorespond with moving/running right
52  def move_right() -> list:
53    return ActionSpace.BUTTONS[ActionSpace.RIGHT]
54      
55  def jump_right() -> list:
56    return ActionSpace.BUTTONS[ActionSpace.JUMP_RIGHT]
57      
58  def jump() -> list:
59    return ActionSpace.BUTTONS[ActionSpace.JUMP]
60      
61  def jump_left() -> list:
62    return ActionSpace.BUTTONS[ActionSpace.JUMP_LEFT]
63    
64  def move_left() -> list:
65    return ActionSpace.BUTTONS[ActionSpace.LEFT]
66
67  def crouch() -> list:
68    return ActionSpace.BUTTONS[ActionSpace.CROUCH]
69
70  def roll() -> list:
71    return ActionSpace.BUTTONS[ActionSpace.ROLL]
72
73  # returns button presses as a list/array by index.
74  # see class ActionSpace fields for aliases for each index
75  def move(index) -> list:
76    return ActionSpace.BUTTONS[index]
77
78  # Returns the number of possible moves (7 moves)
79  def get_n_moves() -> int:
80    return len(ActionSpace.BUTTONS)
81
82  # Converts button presses to a string representing the action
83  def to_string(buttons) -> str:
84    if buttons == ActionSpace.BUTTONS[ActionSpace.STAND_STILL]:
85      return 'X'
86    if buttons == ActionSpace.BUTTONS[ActionSpace.RIGHT]:
87      return '>'
88    if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_RIGHT]:
89      return '/'
90    if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP]:
91      return '|'
92    if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_LEFT]:
93      return '\'
94    if buttons == ActionSpace.BUTTONS[ActionSpace.LEFT]:
95      return '<'
96    if buttons == ActionSpace.BUTTONS[ActionSpace.CROUCH]:
97      return 'o'
98
99  def to_string_big(buttons) -> str:
100    if buttons == ActionSpace.BUTTONS[ActionSpace.STAND_STILL]:
101      return 'XXXXXXX'
102    if buttons == ActionSpace.BUTTONS[ActionSpace.RIGHT]:
103      return '    -->'
104    if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_RIGHT]:
105      return '   |-->'
106    if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP]:
107      return '   |   '
108    if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_LEFT]:
109      return '<--|   '
110    if buttons == ActionSpace.BUTTONS[ActionSpace.LEFT]:
111      return '<--    '
112    if buttons == ActionSpace.BUTTONS[ActionSpace.CROUCH]:
113      return 'vvvvvvv'
114
115  # Returns true if buttons is the button press of a jump action
116  def is_jump(buttons) -> bool:
117    return       buttons == ActionSpace.jump() or       buttons == ActionSpace.jump_right() or       buttons == ActionSpace.jump_left()

/source/learning/reward_system.py

1import os
2import sys
3import numpy as np
4from a_queue import *
5from action_space import ActionSpace
6
7script_dir = os.path.dirname(os.path.abspath(__file__))
8project_dir = os.path.abspath(script_dir + "/../..")
9
10# Calculates a reward which can be used in reinforcement learning.
11# Compliments the reward that is automatically calculated by gym retro.
12# class RewardSystem allows adding more rewards to it. 
13# And object of RewardSystem should call calc_rewards() every frame in order to calculate rewards accurately.
14# Skipping frames might cause some rewards to be over/under calculated.
15class RewardSystem:
16  
17  # --------------------------------- Class XPos ----------------------------
18  # This mirrors the contest Xpos reward. Calculates rew based on raw x position @ a rate of 1 pt for each x traveled
19  class XPos:
20    def __init__(self):
21      self.__x_rew = 1						# reward for moving 1 pixel to the right
22      self.__x_prev = 0						# coordinate of last x position
23    
24    def init(self, info) -> None:
25      self.__x_prev = info['x']
26      
27    def calc_reward(self, info, action) -> int:
28      # rew is new pos - old pos
29      rew = (info['x']  - self.__x_prev) * self.__x_rew # can change scale if interested
30      # set old pos to new pos for next iteration
31      self.__x_prev = info['x']
32      return rew
33      
34    def to_string(self) -> str:
35      return "XPos"
36
37  # --------------------------------- Allow Backtracking -------------------------
38
39  class Backtracking():
40    def __init__(self):
41      None
42
43    def init(self, info) -> None:
44      self.__x_rew = 1						# reward for moving 1 pixel to the right
45      self._max_x = 0
46
47    def calc_reward(self, info, action) -> int:
48      # rew is maximum x traveled
49      rew = max(0, info['x'] - self._max_x) * self.__x_rew
50      # set new max for next iteration
51
52      self._max_x = max(self._max_x, info['x'])
53      return rew
54      
55    def to_string(self) -> str:
56      return "Allow Backtracking"
57
58  # --------------------------------- Class Contest -------------------------
59    
60  class Contest:
61
62    def __init__(self):
63      None
64      
65    def init(self, info) -> None:
66      self.__end_x = info['screen_x_end']
67      self.__prev_progress = 0
68      self.__frame = 0
69
70    def calc_progress(self, info):
71      
72      return info['x'] / self.__end_x 
73
74    def calc_reward(self, info, action) -> int:
75      
76      progress = self.calc_progress(info)
77      rew = (progress - self.__prev_progress) * 9000
78      self.__prev_progress = progress
79
80      # Reward for completing level quickly
81      if progress >= 1:
82        rew = rew + (1 - np.clip(self.__frame / 18000, 0, 1)) * 1000
83      self.__frame+=1
84      return rew
85
86      
87    def to_string(self) -> str:
88      return "Contest"
89      
90  # --------------------------------- Class Complex -------------------------
91  
92  class Complex:
93
94    def __init__(self):
95      self.__frame_counter = 0
96
97      # Reward weights:
98      # Specifies how good each action is.
99      # good actions are positive
100      # bad actions are negative
101      self.__ring_rew = 1000					# reward for each ring collected
102      self.__ring_loss_rew = 0#-10			# penalty for losing any number of rings
103      self.__ring_deficient_rew = -5			# penalty for not having rings (applied every frame we don't have rings)
104
105      self.__ring_count = 0					# how many rings do we have
106
107      self.__robot_rew = 1					# reward for destroying each robot
108      self.__robot_count = 0					# how many robots have been destroyed
109
110      self.__score_rew = 10					# 10 point for every new point scored
111      self.__score_count = 0					# how many points do we have
112
113      self.__life_rew = 1000					# Reward for collecting an extra life
114      self.__life_penalty = -self.__ring_loss_rew	# Penalty for dying
115      self.__life_count = 0					# how many lives do we have
116
117      self.__x_rew = 1						# reward for moving 1 pixel to the right
118      self.__x_prev = 0						# coordinate of last x position
119
120      self.__x_explore_rew = 10				# reward for exploring 1 pixel further than before
121      self.__x_max = 0						# the furthest right we have moved along the x axis.
122
123      self.__y_prev = 0						# coordinate of last y position
124
125      self.__items_rew = 1					# reward for collecting item boxes
126
127      # --- Location Specific Rewards ---
128      # self.__location_rewards = { '' }
129
130      self.__jump_rew = -20					# penalty for each jump
131      self.__jump_history = AQueue()			# timestamps of most recent jumps.
132      self.__jump_tolerance_count = 2			# allows jumping x times without penalty every so many frames
133      self.__jump_tolerance_period = 10		# allows jumping x times without penalty every this many frames
134
135    # Sets initial conditions of current epoch. 
136    # Some rewards are based on previous actions. 
137    # This method sets the initial conditions of the new epoch so that rewards can be based on them.
138    # Sets things like, ring count, current x position, and current score.
139    # Call this method whenever the game is reset or parts of a level are skipped.
140    # ! This is not a constructor !
141    def init(self, info) -> None:
142      # TODO: More reward/penalty ideas
143      # Penalty (For getting stuck): Trying to move right but not increasing 'x'
144      # Penalty (For getting stuck): Trying to move left but not decreasing 'x'
145      # Penalty: For losing a life. Getting hit without rings. 
146      self.__frame_counter = 0
147      self.__ring_count = info['rings']
148      self.__robot_count = 0	# TODO: ???
149      self.__score_count = info['score']
150      self.__life_count = info['lives']
151      self.__x_prev = info['x']
152      self.__x_max = self.__x_prev
153      self.__y_prev = info['y']
154      self.__jump_history.clear()
155
156    # Calculates reward based on environment
157    # env  		- environment of gym retro
158    # obs  		- observation - currently rendered frame as numpy.ndarray
159    # info 		- contains game state information like position, score, ring count and speed
160    # action		- the most recent action made by agent stored as a list of ints. See ActionSpace.
161    # returns 		- recalculated reward as an int 
162    def calc_reward(self, info, action) -> int:
163      self.__frame_counter += 1						# Increment frame counter
164
165      reward = 0
166
167      reward += self.__calc_ring_reward(info)
168      reward += self.__calc_robot_reward(info) 		# TODO: this doesn't do anything yet
169      reward += self.__calc_score_reward(info)
170      reward += self.__calc_life_reward(info)
171      reward += self.__calc_x_reward(info)
172      reward += self.__calc_items_reward(info)		# TODO: this doesn't do anything yet
173      reward += self.__calc_jump_reward(action)		
174
175      return reward
176
177    # Calculates reward for collecting/loosing rings
178    def __calc_ring_reward(self, info) -> int:
179      rings_curr = info['rings'] 
180      ring_diff = rings_curr - self.__ring_count
181
182      self.__ring_count = rings_curr
183
184      # --- Reward for collecting/loosing rings ---
185      reward = 0
186
187      if ring_diff >= 0:
188        # reward for collecting each ring
189        reward += self.__ring_rew * ring_diff	
190      else:
191        # penalize for losing any number of rings
192        reward += self.__ring_loss_rew			
193
194      # --- Penalty for not having rings ---
195      if rings_curr == 0:
196        reward += self.__ring_deficient_rew
197
198      return reward
199
200    # Calculates reward for destroying a robot
201    def __calc_robot_reward(self, info) -> int:
202      # TODO: Don't know
203      return 0
204
205    # Calculates reward for increasing score (This will overlap with other rewards but it will still work)
206    def __calc_score_reward(self, info) -> int:
207      score_curr = info['score']
208      score_diff = score_curr - self.__score_count	
209
210      self.__score_count = score_curr
211
212      return self.__score_rew * score_diff
213
214    # Calculates a reward for collecting a life (or "one up")
215    def __calc_life_reward(self, info) -> int:
216      rew = 0
217
218      life_curr = info['lives']
219      life_diff = life_curr - self.__life_count
220
221      # Did we gain or lose a life?
222      if life_diff >= 0:
223        # We gained a life :)
224        rew += self.__life_rew * life_diff
225      else:
226        # We lost a life :(
227        rew += self.__life_penalty * life_diff
228
229      self.__life_count = life_curr
230
231      return rew
232
233    # Calculates reward for moving right
234    def __calc_x_reward(self, info) -> int:
235      x_curr = info['x']				
236      x_diff = x_curr	- self.__x_prev			# how much did we move since last frame (same as x velocity)
237      x_explored = x_curr - self.__x_max		# how much further right did we move than before
238
239      self.__x_max = (x_curr if x_curr > self.__x_max else self.__x_max)
240      self.__x_prev = x_curr
241
242      reward = 0
243
244      # Reward for every new pixel we move right
245      if x_explored > 0:
246        reward += self.__x_explore_rew * x_explored 
247
248      # Reward/Penalize for every pixel, since last frame, we moved right/left
249      reward += self.__x_rew * x_diff 
250
251      return reward
252
253    def __calc_items_reward(self, info) -> int:
254      # TODO: I don't know
255      return 0
256
257    # Calculates penalty for jumping.
258    # Only penalizes for jumping more than allowed number.
259    # Agent is allowed to jump without penalty x number of times during any period of y frames.
260    # For every extra jump, the agent is penalized
261    def __calc_jump_reward(self, action) -> int:
262      rew = 0
263
264      # Did we jump?
265      if ActionSpace.is_jump(action):
266        # Yes. We jumped.
267
268        # Record this jump.
269        self.__jump_history.push(self.__frame_counter)	# We jumped at frame x
270
271        # Penalize but only if we jumped too much.
272        if self.__jump_history.size() > self.__jump_tolerance_count:
273          rew = self.__jump_rew
274
275      # Update jump history. Remove old jumps.
276      while self.__jump_history.size() > 0:
277        oldest = self.__jump_history.front()
278
279        if oldest + self.__jump_tolerance_period <= self.__frame_counter:
280          self.__jump_history.pop()		# remove oldest jump
281        else:
282          break
283
284      return rew
285  
286    def to_string(self) -> str:
287      return "Complex"