Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pygame as pg
- from math import sin, cos, pi, ceil, floor
- import torch as T
- from torch import nn
- import torch.nn.functional as F
- import torch.optim as optim
- import numpy as np
- from numpy.random import random as nprand
- import matplotlib.pyplot as plt
- from collections import OrderedDict
- import time, os
- import csv
- # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- WIDTH, HEIGHT = 800, 600
- m = 1 # drone mass
- g = 4 # grav. acceleration
- dt = 2 / 60
- l = 1 # length of the base
- eng_l = 0.25 # length of the engine (there are two of them on the left and on the right)
- d = 0.25 # height of both the base and the engines
- drag = 0.1 # drag coefficient
- maxthr = 4 # max engine thrust
- thr_incr = maxthr * dt / 0.5 # increment by which the power is changed according to the key presses
- I = (m * (l + 2 * eng_l) ** 2 / 12) # Moment of inertia for a thin rod
- fontsize = 18
- pg.init()
- font = pg.font.SysFont("arial", fontsize)
- # image = pg.image.load("undrtale.png")
- class QNet(nn.Module):
- def __init__(self, n_state, n_actions, n_layers, n_neurons, lr=0.001):
- super().__init__()
- self.layers = nn.ModuleList()
- self.len = n_layers
- self.n_state = n_state
- self.n_actions = n_actions
- if n_layers == 1:
- self.layers.append(nn.Linear(n_state, n_actions))
- else:
- self.layers.append(nn.Linear(n_state, n_neurons))
- for i in range(n_layers - 2):
- self.layers.append(nn.Linear(n_neurons, n_neurons))
- self.layers.append(nn.Linear(n_neurons, n_actions))
- self.optimizer = optim.Adam(self.parameters(), lr=lr)
- self.loss = nn.MSELoss()
- self.device = T.device('cuda' if T.cuda.is_available() else 'cpu')
- self.to(self.device)
- print(f"using {self.device}")
- def forward(self, x):
- # start = time.time_ns() / 1e6
- for i in range(self.len - 1):
- x = F.relu(self.layers[i](x))
- # end = time.time_ns() / 1e6
- # print(f"QNet forward time: {end - start} ms")
- return self.layers[-1](x)
- class Agent():
- def __init__(self, gamma, eps, lr, n_state, n_actions, batch_size,
- max_mem=100000, eps_end=0.01, eps_dec=5e-4, n_layers=3, n_neurons=128):
- self.gamma = gamma
- self.eps = eps
- self.eps_min = eps_end
- self.eps_dec = eps_dec
- self.action_space = [i for i in range(n_actions)]
- self.lr = lr
- self.batch_size = batch_size
- self.mem_size = max_mem
- self.mem_countr = 0
- self.eval = QNet(n_state, n_actions, n_layers, n_neurons, lr)
- self.smemory = np.zeros((self.mem_size, n_state), dtype=np.float32)
- self.nsmemory = np.zeros((self.mem_size, n_state), dtype=np.float32)
- self.amemory = np.zeros(self.mem_size, dtype=np.int32)
- self.rmemory = np.zeros(self.mem_size, dtype=np.float32)
- self.terminalmemory = np.zeros(self.mem_size, dtype=np.bool_)
- def store_transition(self, state, action, reward, newstate, done):
- i = self.mem_countr % self.mem_size
- self.smemory[i] = state
- self.amemory[i] = action
- self.rmemory[i] = reward
- self.nsmemory[i] = newstate
- self.terminalmemory[i] = done
- self.mem_countr += 1
- def policy(self, state):
- if np.random.random() < self.eps:
- action = np.random.choice(self.action_space)
- else:
- state = T.tensor([state]).to(self.eval.device)
- actions = self.eval.forward(state)
- action = T.argmax(actions).item()
- return action
- def learn(self):
- # start = time.time_ns() / 1e6
- if self.mem_countr < self.batch_size:
- return
- self.eval.optimizer.zero_grad()
- mem = min(self.mem_size, self.mem_countr)
- batch = np.random.choice(mem, self.batch_size, replace=False)
- batch_i = np.arange(self.batch_size, dtype=np.int32)
- state_batch = T.tensor(self.smemory[batch]).to(self.eval.device)
- new_state_batch = T.tensor(self.nsmemory[batch]).to(self.eval.device)
- reward_batch = T.tensor(self.rmemory[batch]).to(self.eval.device)
- terminal_batch = T.tensor(self.terminalmemory[batch]).to(self.eval.device)
- action_batch = self.amemory[batch] # not necessarily a tensor
- q_eval = self.eval.forward(state_batch)[batch_i, action_batch]
- nq_eval = self.eval.forward(new_state_batch)
- nq_eval[terminal_batch] = 0.0
- q_target = reward_batch + self.gamma * T.max(nq_eval, dim=1)[0]
- loss = self.eval.loss(q_target, q_eval).to(self.eval.device)
- loss.backward()
- self.eval.optimizer.step()
- self.eps = max(self.eps_min, self.eps - self.eps_dec)
- # end = time.time_ns() / 1e6
- # print(f"Agent learn time: {end - start} ms")
- def save(self, file):
- T.save(self.eval.state_dict(), file)
- def load(self, file):
- self.eval.load_state_dict(T.load(file))
- def reward(x, y, h):
- global l, eng_l, d
- collision_punish = 100
- R = 6
- r = (x ** 2 + y ** 2) ** 0.5
- if r > R:
- r = R
- done = h < d + l / 2 + eng_l or abs(x) > 20 or abs(y) > 20
- return ((1 - r / R) * 10 + 1) * 0.01 - collision_punish * int(done), done
- def simstep(state, playable=True, action=None):
- # start = time.time_ns()
- global dt, m, g, l, eng_l, d, drag, maxthr, thr_incr, I
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, done) = state
- # cursor
- prevx = xc
- prevy = yc
- # some code for moving
- vxc = (xc - prevx) / dt
- vyc = (yc - prevy) / dt
- # forces
- fx = -drag * vx - (left_thrust + right_thrust) * sin(angle)
- fy = - m * g - drag * vy + (left_thrust + right_thrust) * cos(angle)
- torque = (right_thrust - left_thrust) * (l + eng_l) / 2 - drag * vangle * 4
- # velocities
- vx += (fx / m) * dt
- vy += (fy / m) * dt
- vangle += (torque / I) * dt
- # position and angle
- x += vx * dt
- y += vy * dt
- angle += vangle * dt
- if angle < -pi:
- angle += 2 * pi
- elif angle > pi:
- angle -= 2 * pi
- # Engine control
- if playable:
- # Adjust engine thrusts based on key presses
- if pg.key.get_pressed()[pg.K_LEFT]:
- left_thrust += thr_incr
- else:
- left_thrust -= 2 * thr_incr
- if pg.key.get_pressed()[pg.K_RIGHT]:
- right_thrust += thr_incr
- else:
- right_thrust -= 2 * thr_incr
- else:
- '''
- if action in (1, 5):
- left_thrust -= thr_incr
- if action in (2, 5):
- right_thrust -= thr_incr
- if action in (3, 6):
- left_thrust += thr_incr
- if action in (4, 6):
- right_thrust += thr_incr
- '''
- if action == 0:
- left_thrust -= thr_incr
- right_thrust += thr_incr
- elif action == 1:
- left_thrust += thr_incr
- right_thrust -= thr_incr
- elif action == 2:
- left_thrust += thr_incr
- right_thrust += thr_incr
- elif action == 3:
- left_thrust -= thr_incr
- right_thrust -= thr_incr
- left_thrust = max(0, min(left_thrust, maxthr))
- right_thrust = max(0, min(right_thrust, maxthr))
- rew, done = reward(xc - x, yc - y, y)
- # end = time.time_ns()
- # print(f"sim time: {end - start} ns")
- return (rew,
- [x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, done])
- def get_observation(state):
- global dt
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thr, right_thr, done) = state
- return (xc - x, yc - y, y, angle,
- vx, vy, vxc - vx, vyc - vy, vangle,
- left_thr, right_thr)
- def get_observation2(state):
- global dt
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thr, right_thr, done) = state
- return (xc - x, yc - y, y, sin(angle), cos(angle), vx, vy, vangle, left_thr, right_thr)
- def render_multi_line(screen, font, text, x, y, color, fsize):
- lines = text.splitlines()
- for i, l in enumerate(lines):
- screen.blit(font.render(l, 1, color), (x, y + fsize * i))
- def drawgrid(cam, step, substeps, wl=1, dark=100, thin=0):
- w, h, scale, x, y = cam
- surf = pg.Surface((w, h), pg.SRCALPHA, 32)
- x -= w / scale / 2
- y -= h / scale / 2
- xstart = floor(x / step) * step - x
- ystart = y - ceil(y / step) * step
- for i in range(ceil(h / step) * (substeps + 1)):
- if ystart + i * step / substeps > h:
- break
- weaken = bool(i % (substeps + 1))
- pg.draw.line(surf, (255 - weaken * dark, 255 - weaken * dark, 255 - weaken * dark),
- (0, (ystart + i * step / (substeps + 1)) * scale),
- (w, (ystart + i * step / (substeps + 1)) * scale), wl - weaken * thin)
- for j in range(ceil(w / step) * (substeps + 1)):
- if xstart + j * step / substeps > w:
- break
- weaken = bool(j % (substeps + 1))
- pg.draw.line(surf, (255 - weaken * dark, 255 - weaken * dark, 255 - weaken * dark),
- ((xstart + j * step / (substeps + 1)) * scale, 0),
- ((xstart + j * step / (substeps + 1)) * scale, h), wl - weaken * thin)
- return surf
- def cam_coords(cam, x, y):
- w, h, scale, x0, y0 = cam
- x = (x - x0) * scale + w / 2
- y = (y0 - y) * scale + h / 2
- return x, y
- def render(state, score, screen, cam, scale, w, h):
- '''render the drone, its engines, and the ground.
- The camera is centered at (0, 2); 1 unit corresponds to 100px.
- The background is black, the drone is also black woth a white thin outline;
- the engines are also outlined. When they are turned on, little triangles appear,
- which represent air/propellant/whatever. The ground is grey.
- '''
- # Clear the screen
- screen.fill((0, 0, 0))
- global l, eng_l, d, maxthr
- # Unpack the state
- x, y, xc, yc, angle, vx, vy, vxc, vyc, vangle, left_thrust, right_thrust, done = state
- # Draw the ground
- pg.draw.rect(screen, (100, 100, 100), (0, cam_coords(cam, 0, 0)[1], w, h + 1))
- # Draw the grid
- grid = drawgrid(cam, 4, 3, 2, thin=1)
- screen.blit(grid, (0, 0))
- # Calculate the coordinates relative to the camera
- xc, yc = cam_coords(cam, xc, yc)
- # Draw the cursor
- pg.draw.circle(screen, (150, 255, 150), (max(min(xc, w), 0), max(min(yc, h), 0)), 0.25 * scale)
- # Draw the drone
- thr_scale = 0.5 * scale
- l_ = l * scale
- eng_l_ = eng_l * scale
- d_ = d * scale
- drone_surf = pg.Surface((l_ + 2 * eng_l_, d_ + 2 * thr_scale), pg.SRCALPHA, 32)
- pg.draw.rect(drone_surf, (255, 255, 255), (0, thr_scale, eng_l_, d_), 2) # left engine
- pg.draw.rect(drone_surf, (255, 255, 255), (eng_l_, thr_scale, l_, d_), 2) # base
- pg.draw.rect(drone_surf, (255, 255, 255), (l_ + eng_l_, thr_scale, eng_l_, d_), 2) # right engine
- pg.draw.polygon(drone_surf, (255, 255, 200),
- [(0, d_ + thr_scale),
- (eng_l_ // 2, d_ + (1 + left_thrust / maxthr) * thr_scale),
- (eng_l_, d_ + thr_scale)]) # left flame
- pg.draw.polygon(drone_surf, (255, 255, 200),
- [(l_ + eng_l_, d_ + thr_scale),
- (l_ + eng_l_ + eng_l_ // 2, d_ + (1 + right_thrust / maxthr) * thr_scale),
- (l_ + 2 * eng_l_, d_ + thr_scale)]) # right flame
- drone_surf = pg.transform.rotate(drone_surf, angle / pi * 180)
- drone_rect = drone_surf.get_rect()
- drone_rect.center = cam_coords(cam, x, y)
- screen.blit(drone_surf, drone_rect)
- # Print information & "HUD"
- # global image
- # screen.blit(image, (0, 500))
- winfo = 3
- trnsprt = 180
- hud = pg.Surface((fontsize * 18 + 2 * winfo, fontsize * 8 + 2 * winfo), pg.SRCALPHA, 32)
- pg.draw.rect(hud, (180, 180, 180, trnsprt), (0, 0, fontsize * 18 + 2 * winfo, fontsize * 8 + 2 * winfo))
- pg.draw.rect(hud, (0, 0, 0, trnsprt), (winfo, winfo, fontsize * 18, fontsize * 8))
- render_multi_line(hud, font,
- f'Coords: ({round(x, 2):.2f}, {round(y, 2):.2f}); angle: {round(angle, 2):.2f}\n'
- f'Velocity: ({round(vx, 2):.2f}, {round(vy, 2):.2f}); angular: {round(vangle, 2):.2f}\n'
- f'Thrusters: left: {round(left_thrust, 2):.2f}; right: {round(right_thrust, 2):.2f}\n'
- f'Score: {score:.2f}',
- 20, 20, (255, 255, 255), fontsize * 2)
- screen.blit(hud, (0, 0))
- return screen
- def plot_progress(x, scores, file):
- plt.scatter(x, scores, s=1 / 4, c=((0.3, 0.6, 0.8),), linewidth=0)
- plt.savefig(file, dpi=300)
- def writedata(file, *args):
- with open(file, "a") as f:
- writer = csv.writer(f)
- writer.writerow(args)
- def main():
- print(T.cuda.is_available())
- scale = 100
- dronename = "smol"
- if not os.path.exists(dronename):
- os.mkdir(dronename)
- writedata(f"{dronename}/data.csv", ["i", "score", "mean_score"])
- screen = pg.display.set_mode((WIDTH, HEIGHT))
- pg.display.set_caption('Drone thingy')
- clock = pg.time.Clock()
- do_render = False
- playable = False
- n_games = 500000
- ## observation (not-exactly-state): [xc', yc', h, angle, vx, vy, vxc', vxy' vangle, left_thr, right_thr] - 11
- # observation2: [xc', yc', h, sin, cos, vx, vy, vangle, left_thr, right_thr] - 9
- ## actions = (0:nothing, 1:left-, 2:right-, 3:left+, 4:right+, 5:both-, 6:both+) - 7
- # actions2 = (0:left_roll, 1:right_roll, 2:both+, 3:both-) - 4
- drone = Agent(0.995, 1, 0.001, n_state=10, n_actions=4, batch_size=64, n_layers=2, n_neurons=64, eps_dec=1e-5)
- drone.load(f"{dronename}/drone.pt")
- scores, epss = np.array([], dtype=np.float32), np.array([], dtype=np.float32)
- maxcount = 1000
- states = []
- n_states = 10
- for i in range(n_games):
- # [x, y, xc, yc, angle, vx, vy, vxc, vyc, vangle, left_thrust, right_thrust, done]
- state = [(2 * nprand() - 1) * 10, 2 + nprand() * 8, # x y
- (2 * nprand() - 1) * 10, -2 + nprand() * 12, # xc yc
- pi * (2 * nprand() - 1) * 0.1, # angle
- (2 * nprand() - 1) * 1, (1.5 * nprand() - 0.5) * 1, # vx, vy
- 0, 0, # vxc', vyc' (have to be initialised even with no actual info
- pi * (2 * nprand() - 1) * 1, # vangle
- maxthr * nprand() * 0, maxthr * nprand() * 0, False] # thrust, done
- # cam = (WIDTH, HEIGHT, scale, state[0], state[1])
- score = 0
- counter = 0
- while not state[-1]:
- # start = time.time_ns() / 1e6
- for event in pg.event.get():
- if event.type == pg.QUIT:
- T.save(drone.eval.state_dict(), f"{dronename}/drone.pt")
- return
- if event.type == pg.KEYDOWN:
- if event.key == pg.K_r:
- do_render = True
- elif event.key == pg.K_SPACE:
- do_render = False
- observation = get_observation2(state)
- action = drone.policy(observation)
- reward, state = simstep(state, playable, action)
- score += reward
- next_observation = get_observation2(state)
- drone.store_transition(observation, action, reward, next_observation, state[-1])
- drone.learn()
- if do_render:
- cam = (WIDTH, HEIGHT, scale, state[0], max(2, state[1]))
- screen = render(state, score, screen, cam, scale, WIDTH, HEIGHT)
- pg.display.flip()
- clock.tick(60)
- if counter > maxcount:
- state[-1] = True
- print("EXCEEDED")
- maxcount += 1
- counter += 1
- # end = time.time_ns() / 1e6
- # print(f"total time: {end - start} ms")
- # print("\n\n")
- scores = np.append(scores, score)
- epss = np.append(epss, drone.eps)
- avg_score = np.mean(scores[max(0, i - 500):i + 1])
- writedata(f"{dronename}/data.csv", [i, score, avg_score])
- if not i % 50:
- print(f'episode {i}:\nscore: {score}\naverage score: {avg_score}\neps: {drone.eps}\n')
- if not i % 1000:
- x = np.arange(i + 1)
- plot_progress(x, scores, f"{dronename}/plot_{i // 1000}k.png")
- if not i % 5000:
- T.save(drone.eval.state_dict(), f"{dronename}/drone_{i // 1000}k.pt")
- drone.save(f"drone_{dronename}.pt")
- return
- # Press the green button in the gutter to run the script.
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement