DeepQ Tuning
Moving forward with Deep Q Learning as our model for the reinforcement learning agent, the team shifted efforts toward optimization of this model. As Deep Q utilizes images from the emulator environment as input to map button press actions to optimal rewards, we identified three areas in which to experiment: image input, actions, and reward structure.
The OpenAI Gym Retro emulator processes images at 60 frames per second (fps). However, we posited that this frame rate was faster than humans could process the image, make a decision, and press a button. Therefore, we did not need to map an input to each frame, in the emulator and could take advantage of a reduced frame rate in several ways. First, we modified our input to our Deep Q Agent to take 4 consecutive concatenated frames as input, which provides richer contextual information to the model such as direction of travel and momentum. Next, we applied stochastic frame skipping to these images prior to concatenation. This means that rather than seeing 4 directly consecutive frames, the environment may skip up to 4 frames before concatenating the next image. This effectively reduces our frame rate to 15 fps, which better aligns with human button press times.
According to research by Google's DeepMind, Deep Q Learning is optimized around relatively small, discrete action spaces. However, the Sega Genesis controller emulation contains 12 buttons, which can be pressed in 12^2 or 4,096 combinations. Therefore, we opted to reduce the action space to the most viable moves for Sonic to make in the environment. This eliminates redundant combinations such as pressing up and down at the same time. Our final action space is reduced to 7 basic actions: stand still, jump, walk right, jump right, walk left, jump left, and crouch/roll.
The OpenAI Gym Retro environment provides several reward functions as a baseline to inform the agent of the success of its previous move. These include raw “x position”, indicating how far right Sonic has moved in the environment and “contest” which calculates x position relative to environment length and adds a bonus for quickly completing a level. The team added two additional reward functions for experimentation. First, we added a modified version of “contest” which did not penalize backtracking, allowing Sonic to move left to gain momentum in support of overcoming tall obstacles. Additionally, we added a complex reward function which included relative x position, collecting rings, eliminating enemies, and penalized excessive jumping to prioritize forward momentum. Our most successful runs utilized the “backtracking” reward function.
Our Code:
/source/interface/wrappers.py
1import numpy as np
2import gym
3
4class StochasticFrameSkip(gym.Wrapper):
5 def __init__(self, env, n, stickprob):
6 gym.Wrapper.__init__(self, env)
7 self.n = n
8 self.stickprob = stickprob
9 self.curac = None
10 self.rng = np.random.RandomState()
11 self.supports_want_render = hasattr(env, "supports_want_render")
12
13 def reset(self, **kwargs):
14 self.curac = None
15 return self.env.reset(**kwargs)
16
17 def step(self, ac):
18 done = False
19 totrew = 0
20 for i in range(self.n):
21 # First step after reset, use action
22 if self.curac is None:
23 self.curac = ac
24 # First substep, delay with probability=stickprob
25 elif i==0:
26 if self.rng.rand() > self.stickprob:
27 self.curac = ac
28 # Second substep, new action definitely kicks in
29 elif i==1:
30 self.curac = ac
31 if self.supports_want_render and i<self.n-1:
32 ob, rew, done, info = self.env.step(self.curac, want_render=False)
33 else:
34 ob, rew, done, info = self.env.step(self.curac)
35 totrew += rew
36 if done: break
37 return ob, totrew, done, info
/source/interface/action_space.py
1# Represents all possible moves.
2# Converts moves to button presses which can be used with env.step()
3#
4# ex:
5# # returns an array of 12 ints representing the button presses for this action
6# buttons = ActionSpace.move_right()
7#
8# env = retro.make(game='SonicTheHedgehog-Genesis', state='LabyrinthZone.Act1')
9# env.step(buttons) # sends move to game emulator
10#
11# ActionSpace is treated like a namespace. It is not intended to be instantiated.
12
13class ActionSpace:
14 # --------------------------------- FIELDS --------------------------------
15
16 # Index of each possible move.
17 # These values are constant and should not be change at runtime.
18 STAND_STILL = 0
19 RIGHT = 1
20 JUMP_RIGHT = 2
21 JUMP = 3
22 JUMP_LEFT = 4
23 LEFT = 5
24 CROUCH = 6
25 ROLL = CROUCH
26 # TODO: Do we need one for spin dash
27
28 # look up table which maps action specified by an index [0, 7]
29 # to a combination of button presses.
30 # *** There is no way to control rolling left or right. ***
31 # *** Momentum determines direction of roll. ***
32 BUTTONS = [
33 # 0 1 2 3 4 5 6 7 8 9 10 11
34 # A B C ^ v < >
35 [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], # 0 - stand still
36 [ 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0 ], # 1 - right
37 [ 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0 ], # 2 - jump right
38 [ 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], # 3 - jump
39 [ 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0 ], # 4 - jump left
40 [ 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0 ], # 5 - left
41 [ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0 ], # 6 - crouch/roll
42 # TODO: Do we need spin dash?
43 ]
44
45 # --------------------------------- METHODS -------------------------------
46
47 # returns button presses which coorespond with standing still
48 def stand_still() -> list:
49 return ActionSpace.BUTTONS[ActionSpace.STAND_STILL]
50
51 # returns button presses which coorespond with moving/running right
52 def move_right() -> list:
53 return ActionSpace.BUTTONS[ActionSpace.RIGHT]
54
55 def jump_right() -> list:
56 return ActionSpace.BUTTONS[ActionSpace.JUMP_RIGHT]
57
58 def jump() -> list:
59 return ActionSpace.BUTTONS[ActionSpace.JUMP]
60
61 def jump_left() -> list:
62 return ActionSpace.BUTTONS[ActionSpace.JUMP_LEFT]
63
64 def move_left() -> list:
65 return ActionSpace.BUTTONS[ActionSpace.LEFT]
66
67 def crouch() -> list:
68 return ActionSpace.BUTTONS[ActionSpace.CROUCH]
69
70 def roll() -> list:
71 return ActionSpace.BUTTONS[ActionSpace.ROLL]
72
73 # returns button presses as a list/array by index.
74 # see class ActionSpace fields for aliases for each index
75 def move(index) -> list:
76 return ActionSpace.BUTTONS[index]
77
78 # Returns the number of possible moves (7 moves)
79 def get_n_moves() -> int:
80 return len(ActionSpace.BUTTONS)
81
82 # Converts button presses to a string representing the action
83 def to_string(buttons) -> str:
84 if buttons == ActionSpace.BUTTONS[ActionSpace.STAND_STILL]:
85 return 'X'
86 if buttons == ActionSpace.BUTTONS[ActionSpace.RIGHT]:
87 return '>'
88 if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_RIGHT]:
89 return '/'
90 if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP]:
91 return '|'
92 if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_LEFT]:
93 return '\'
94 if buttons == ActionSpace.BUTTONS[ActionSpace.LEFT]:
95 return '<'
96 if buttons == ActionSpace.BUTTONS[ActionSpace.CROUCH]:
97 return 'o'
98
99 def to_string_big(buttons) -> str:
100 if buttons == ActionSpace.BUTTONS[ActionSpace.STAND_STILL]:
101 return 'XXXXXXX'
102 if buttons == ActionSpace.BUTTONS[ActionSpace.RIGHT]:
103 return ' -->'
104 if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_RIGHT]:
105 return ' |-->'
106 if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP]:
107 return ' | '
108 if buttons == ActionSpace.BUTTONS[ActionSpace.JUMP_LEFT]:
109 return '<--| '
110 if buttons == ActionSpace.BUTTONS[ActionSpace.LEFT]:
111 return '<-- '
112 if buttons == ActionSpace.BUTTONS[ActionSpace.CROUCH]:
113 return 'vvvvvvv'
114
115 # Returns true if buttons is the button press of a jump action
116 def is_jump(buttons) -> bool:
117 return buttons == ActionSpace.jump() or buttons == ActionSpace.jump_right() or buttons == ActionSpace.jump_left()
/source/learning/reward_system.py
1import os
2import sys
3import numpy as np
4from a_queue import *
5from action_space import ActionSpace
6
7script_dir = os.path.dirname(os.path.abspath(__file__))
8project_dir = os.path.abspath(script_dir + "/../..")
9
10# Calculates a reward which can be used in reinforcement learning.
11# Compliments the reward that is automatically calculated by gym retro.
12# class RewardSystem allows adding more rewards to it.
13# And object of RewardSystem should call calc_rewards() every frame in order to calculate rewards accurately.
14# Skipping frames might cause some rewards to be over/under calculated.
15class RewardSystem:
16
17 # --------------------------------- Class XPos ----------------------------
18 # This mirrors the contest Xpos reward. Calculates rew based on raw x position @ a rate of 1 pt for each x traveled
19 class XPos:
20 def __init__(self):
21 self.__x_rew = 1 # reward for moving 1 pixel to the right
22 self.__x_prev = 0 # coordinate of last x position
23
24 def init(self, info) -> None:
25 self.__x_prev = info['x']
26
27 def calc_reward(self, info, action) -> int:
28 # rew is new pos - old pos
29 rew = (info['x'] - self.__x_prev) * self.__x_rew # can change scale if interested
30 # set old pos to new pos for next iteration
31 self.__x_prev = info['x']
32 return rew
33
34 def to_string(self) -> str:
35 return "XPos"
36
37 # --------------------------------- Allow Backtracking -------------------------
38
39 class Backtracking():
40 def __init__(self):
41 None
42
43 def init(self, info) -> None:
44 self.__x_rew = 1 # reward for moving 1 pixel to the right
45 self._max_x = 0
46
47 def calc_reward(self, info, action) -> int:
48 # rew is maximum x traveled
49 rew = max(0, info['x'] - self._max_x) * self.__x_rew
50 # set new max for next iteration
51
52 self._max_x = max(self._max_x, info['x'])
53 return rew
54
55 def to_string(self) -> str:
56 return "Allow Backtracking"
57
58 # --------------------------------- Class Contest -------------------------
59
60 class Contest:
61
62 def __init__(self):
63 None
64
65 def init(self, info) -> None:
66 self.__end_x = info['screen_x_end']
67 self.__prev_progress = 0
68 self.__frame = 0
69
70 def calc_progress(self, info):
71
72 return info['x'] / self.__end_x
73
74 def calc_reward(self, info, action) -> int:
75
76 progress = self.calc_progress(info)
77 rew = (progress - self.__prev_progress) * 9000
78 self.__prev_progress = progress
79
80 # Reward for completing level quickly
81 if progress >= 1:
82 rew = rew + (1 - np.clip(self.__frame / 18000, 0, 1)) * 1000
83 self.__frame+=1
84 return rew
85
86
87 def to_string(self) -> str:
88 return "Contest"
89
90 # --------------------------------- Class Complex -------------------------
91
92 class Complex:
93
94 def __init__(self):
95 self.__frame_counter = 0
96
97 # Reward weights:
98 # Specifies how good each action is.
99 # good actions are positive
100 # bad actions are negative
101 self.__ring_rew = 1000 # reward for each ring collected
102 self.__ring_loss_rew = 0#-10 # penalty for losing any number of rings
103 self.__ring_deficient_rew = -5 # penalty for not having rings (applied every frame we don't have rings)
104
105 self.__ring_count = 0 # how many rings do we have
106
107 self.__robot_rew = 1 # reward for destroying each robot
108 self.__robot_count = 0 # how many robots have been destroyed
109
110 self.__score_rew = 10 # 10 point for every new point scored
111 self.__score_count = 0 # how many points do we have
112
113 self.__life_rew = 1000 # Reward for collecting an extra life
114 self.__life_penalty = -self.__ring_loss_rew # Penalty for dying
115 self.__life_count = 0 # how many lives do we have
116
117 self.__x_rew = 1 # reward for moving 1 pixel to the right
118 self.__x_prev = 0 # coordinate of last x position
119
120 self.__x_explore_rew = 10 # reward for exploring 1 pixel further than before
121 self.__x_max = 0 # the furthest right we have moved along the x axis.
122
123 self.__y_prev = 0 # coordinate of last y position
124
125 self.__items_rew = 1 # reward for collecting item boxes
126
127 # --- Location Specific Rewards ---
128 # self.__location_rewards = { '' }
129
130 self.__jump_rew = -20 # penalty for each jump
131 self.__jump_history = AQueue() # timestamps of most recent jumps.
132 self.__jump_tolerance_count = 2 # allows jumping x times without penalty every so many frames
133 self.__jump_tolerance_period = 10 # allows jumping x times without penalty every this many frames
134
135 # Sets initial conditions of current epoch.
136 # Some rewards are based on previous actions.
137 # This method sets the initial conditions of the new epoch so that rewards can be based on them.
138 # Sets things like, ring count, current x position, and current score.
139 # Call this method whenever the game is reset or parts of a level are skipped.
140 # ! This is not a constructor !
141 def init(self, info) -> None:
142 # TODO: More reward/penalty ideas
143 # Penalty (For getting stuck): Trying to move right but not increasing 'x'
144 # Penalty (For getting stuck): Trying to move left but not decreasing 'x'
145 # Penalty: For losing a life. Getting hit without rings.
146 self.__frame_counter = 0
147 self.__ring_count = info['rings']
148 self.__robot_count = 0 # TODO: ???
149 self.__score_count = info['score']
150 self.__life_count = info['lives']
151 self.__x_prev = info['x']
152 self.__x_max = self.__x_prev
153 self.__y_prev = info['y']
154 self.__jump_history.clear()
155
156 # Calculates reward based on environment
157 # env - environment of gym retro
158 # obs - observation - currently rendered frame as numpy.ndarray
159 # info - contains game state information like position, score, ring count and speed
160 # action - the most recent action made by agent stored as a list of ints. See ActionSpace.
161 # returns - recalculated reward as an int
162 def calc_reward(self, info, action) -> int:
163 self.__frame_counter += 1 # Increment frame counter
164
165 reward = 0
166
167 reward += self.__calc_ring_reward(info)
168 reward += self.__calc_robot_reward(info) # TODO: this doesn't do anything yet
169 reward += self.__calc_score_reward(info)
170 reward += self.__calc_life_reward(info)
171 reward += self.__calc_x_reward(info)
172 reward += self.__calc_items_reward(info) # TODO: this doesn't do anything yet
173 reward += self.__calc_jump_reward(action)
174
175 return reward
176
177 # Calculates reward for collecting/loosing rings
178 def __calc_ring_reward(self, info) -> int:
179 rings_curr = info['rings']
180 ring_diff = rings_curr - self.__ring_count
181
182 self.__ring_count = rings_curr
183
184 # --- Reward for collecting/loosing rings ---
185 reward = 0
186
187 if ring_diff >= 0:
188 # reward for collecting each ring
189 reward += self.__ring_rew * ring_diff
190 else:
191 # penalize for losing any number of rings
192 reward += self.__ring_loss_rew
193
194 # --- Penalty for not having rings ---
195 if rings_curr == 0:
196 reward += self.__ring_deficient_rew
197
198 return reward
199
200 # Calculates reward for destroying a robot
201 def __calc_robot_reward(self, info) -> int:
202 # TODO: Don't know
203 return 0
204
205 # Calculates reward for increasing score (This will overlap with other rewards but it will still work)
206 def __calc_score_reward(self, info) -> int:
207 score_curr = info['score']
208 score_diff = score_curr - self.__score_count
209
210 self.__score_count = score_curr
211
212 return self.__score_rew * score_diff
213
214 # Calculates a reward for collecting a life (or "one up")
215 def __calc_life_reward(self, info) -> int:
216 rew = 0
217
218 life_curr = info['lives']
219 life_diff = life_curr - self.__life_count
220
221 # Did we gain or lose a life?
222 if life_diff >= 0:
223 # We gained a life :)
224 rew += self.__life_rew * life_diff
225 else:
226 # We lost a life :(
227 rew += self.__life_penalty * life_diff
228
229 self.__life_count = life_curr
230
231 return rew
232
233 # Calculates reward for moving right
234 def __calc_x_reward(self, info) -> int:
235 x_curr = info['x']
236 x_diff = x_curr - self.__x_prev # how much did we move since last frame (same as x velocity)
237 x_explored = x_curr - self.__x_max # how much further right did we move than before
238
239 self.__x_max = (x_curr if x_curr > self.__x_max else self.__x_max)
240 self.__x_prev = x_curr
241
242 reward = 0
243
244 # Reward for every new pixel we move right
245 if x_explored > 0:
246 reward += self.__x_explore_rew * x_explored
247
248 # Reward/Penalize for every pixel, since last frame, we moved right/left
249 reward += self.__x_rew * x_diff
250
251 return reward
252
253 def __calc_items_reward(self, info) -> int:
254 # TODO: I don't know
255 return 0
256
257 # Calculates penalty for jumping.
258 # Only penalizes for jumping more than allowed number.
259 # Agent is allowed to jump without penalty x number of times during any period of y frames.
260 # For every extra jump, the agent is penalized
261 def __calc_jump_reward(self, action) -> int:
262 rew = 0
263
264 # Did we jump?
265 if ActionSpace.is_jump(action):
266 # Yes. We jumped.
267
268 # Record this jump.
269 self.__jump_history.push(self.__frame_counter) # We jumped at frame x
270
271 # Penalize but only if we jumped too much.
272 if self.__jump_history.size() > self.__jump_tolerance_count:
273 rew = self.__jump_rew
274
275 # Update jump history. Remove old jumps.
276 while self.__jump_history.size() > 0:
277 oldest = self.__jump_history.front()
278
279 if oldest + self.__jump_tolerance_period <= self.__frame_counter:
280 self.__jump_history.pop() # remove oldest jump
281 else:
282 break
283
284 return rew
285
286 def to_string(self) -> str:
287 return "Complex"