(16-3)多智能体强化学习实战:Predator-Prey游戏(3)

这篇具有很好参考价值的文章主要介绍了(16-3)多智能体强化学习实战:Predator-Prey游戏(3)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

16.6  第二个环境

创建一个名为PredatorPreyEnv2的自定义强化学习环境类,用于模拟实现“捕食者-猎物”(Predator-Prey)的环境。这是一个多智能体环境,其中捕食者追逐猎物,而猎物尽量躲避被捕食。与传统的Predator-Prey环境不同,这个版本中的猎物不会永久消失,并且捕食者只能在一次行动中捕食猎物。这个环境常用于研究协作、竞争和策略选择等问题。对环境PredatorPreyEnv2的具体说明如下:

  1. Predator-Prey(捕食者-猎物):这个环境中包括两种类型的角色,一种是捕食者(Predator),另一种是猎物(Prey)。捕食者的目标是捕获猎物,而猎物的目标是躲避捕食者。
  2. Immortal Prey(不死猎物):在这个环境中,猎物是“不死的”,意味着它们不会因为被捕食而消失。通常,在传统的Predator-Prey环境中,猎物被捕食后会被移除,但在这个版本中,猎物不会被永久消失,它们可以一次又一次地被捕食。
  3. One time eat(一次性捕食):这意味着捕食者只能在一次行动中捕食到猎物,之后猎物不会再次被同一个捕食者捕食。这一规则增加了游戏的挑战,因为捕食者需要选择最佳的时机和策略来捕食猎物。

环境类PredatorPreyEnv2的具体实现流程如下所示。

(1)导入必要的Python库和模块,包括Gym,NumPy,Pygame等。具体实现代码如下所示。

import gymnasium as gym
from gymnasium import spaces
import pygame
import numpy as np
from collections import deque

(2)定义自定义Gym环境的类,该类继承自gym.Env,这是一个Gym环境的基类。具体实现代码如下所示。

class PredatorPreyEnv2(gym.Env):
    metadata = {'render.modes': ['human', 'rgb_array'],
                'render-fps': 4}

(3) 实现类PredatorPreyEnv2的构造函数,用于初始化环境的各种属性和参数。其中包括环境的大小、视野、捕食者和猎物数量、奖励设置等等。构造函数还设置了观察空间和动作空间的规格。具体实现代码如下所示。   

    def __init__(self,
                render_mode=None,
                size:int=10,
                vision:int=5,
                predator:int =3,
                prey:int =1,
                error_reward:float=-2,
                success_reward:float=10,
                living_reward:float=-1,
                img_mode:bool=False,
                episode_length:int=100,
                history_length:int=4,
                communication_bits:int=0,
                cooperate:float=1):
        self.size  = size
        self.vision = vision
        self.window_size = 500
        self.render_mode = render_mode
        self.predator_num = predator
        self.prey_num = prey
        self.active_predator = [True for i in range(self.predator_num)]
        self.active_prey = [True for i in range(self.prey_num)]
        self.error_reward = error_reward
        self.success_reward = success_reward
        self.living_reward = living_reward
        self.episode_length = episode_length
        self.img_mode = img_mode
        self.steps = 0
        self.window = None
        self.clock = None
        self.cooperate = cooperate
        self.render_scale = 1
        self.observation_space = spaces.Dict({
            'predator': spaces.Sequence(spaces.Box(0, size-1, shape=(2,), dtype=np.int32)),
            'prey': spaces.Box(0, size-1, shape=(2,), dtype=np.int32),
        })
        total_actions = 5
        self.action_space_predator = spaces.MultiDiscrete([total_actions]*predator)
        self.action_space_prey = spaces.MultiDiscrete([total_actions]*prey)
        self.single_action_space = spaces.Discrete(total_actions)
        self._action_to_direction = {
            0: np.array([0, 1]),
            1: np.array([1, 0]),
            2: np.array([0, -1]),
            3: np.array([-1, 0]),
            4: np.array([0, 0])
        }
        self.frame_history = deque(maxlen=4)
        self.history_length = history_length
        self.communication_bits = communication_bits
        if self.communication_bits>0:
            self.pred_communication = np.zeros((self.predator_num))
            self.prey_communication = np.zeros((self.prey_num))

(4)分别实现方法_get_obs(self) 和_get_np_arr_obs(self),这两个方法用于获取当前环境的观察值(状态)。第一个方法返回一个包含捕食者和猎物位置的字典,第二个方法返回一个包含捕食者和猎物位置的NumPy数组。具体实现代码如下所示。

    def _get_obs(self):
        if self.img_mode:
            return self._get_np_arr_obs()
        return {
            'predator': self._predator_location,
            'prey': self._prey_location
        }

(5)定义方法_get_info(self),用于获取环境的其他信息,通常为空字典。具体实现代码如下所示。

    def _get_np_arr_obs(self):
        predator_states = []
        prey_states = []
        for i in range(len(self._predator_location)):
            state = self._render_predator_frame(predator_id=i)
            predator_states.append(state)
        for i in range(len(self._prey_location)):
            state = self._render_prey_frame(prey_id=i)
            prey_states.append(state)
        return {
            "predator":predator_states, 
            "prey":prey_states
        }

(6)定义方法reset(self, *, seed: int=1, options=None),用于重置环境的状态,通常在每个新的回合(episode)开始时调用。它随机初始化了捕食者和猎物的位置,并返回初始观察值。具体实现代码如下所示。

    def reset(self, *, seed: int=1, options=None):
        self._predator_location = np.random.randint(0, self.size, size=(self.predator_num, 2))
        self._prey_location = np.random.randint(0, self.size, size=(self.prey_num, 2))
        self.steps = 0
        self.active_predator = [True for i in range(self.predator_num)]
        self.active_prey = [True for i in range(self.prey_num)]
        if self.render_mode == 'human':
            self._render_frame()
        self._save_frame_history()
        return self._get_frame_history(self.history_length), self._get_info()

(7)定义方法get_reward(self) 和方法_get_prey_reward(self),这两个方法用于计算捕食者和猎物的奖励。前者计算捕食者的奖励,如果捕食者成功捕获猎物,会获得正奖励,否则获得负奖励。后者计算猎物的奖励,如果猎物被捕获,奖励为0,否则为正奖励。具体实现代码如下所示。   

    def _get_reward(self):
        # if any predator reaches prey, success. else, living reward
        rewards = [self.living_reward for i in range(self.predator_num)]
        for i in range(self.predator_num):
            for j in range(self.prey_num):
                if self.active_predator[i]:
                    if np.all(self._predator_location[i]==self._prey_location[j]):
                        rewards = [self.cooperate*self.success_reward for i in range(self.predator_num)]
                        rewards[i] = self.success_reward
    #                     print("EATEN")
                        return rewards
        return rewards
    
    def _get_prey_reward(self):
        # if any predator reaches prey, success. else, living reward
        rewards = [self.success_reward for i in range(self.prey_num)]
        for i in range(self.prey_num):
            if self._prey_location[i] in self._predator_location:
                rewards[i] = 0
        return rewards

(8)定义方法_is_done(self),用于判断当前回合是否结束。如果捕食者全部死亡、达到最大步数或者所有猎物被捕获,回合结束。具体实现代码如下所示。   

    def _is_done(self):
        # if all prey are gone or episode length is reached, done
        if self.steps >= self.episode_length:
            return True
        if np.sum(self.active_predator)==0:
            return True
        return False
        if np.sum(self.active_prey) == 0:
            return True
        return False

(9)分别定义方法_is_valid_predator(self, location, index) 和方法_is_valid_prey(self, location, index),这两个方法用于检查捕食者和猎物的移动是否有效,是否超出边界或重叠。具体实现代码如下所示。

    def _is_valid_predator(self, location, index):
        # check if location is valid
        if location[0] < 0 or location[0] >= self.size or location[1] < 0 or location[1] >= self.size:
            return False
        if location in np.delete(self._predator_location, index, axis=0):
            return False
        return True
    
    def _is_valid_prey(self, location, index):
        # check if location is valid for prey of i'th index
        if location[0] < 0 or location[0] >= self.size or location[1] < 0 or location[1] >= self.size:
            return False
        if location in np.delete(self._prey_location, index, axis=0):
            return False
        return True

(10)定义方法render(self),用于渲染环境,返回渲染结果。根据不同的渲染模式,可能返回RGB数组或者在Pygame窗口中显示。具体实现代码如下所示。

    def render(self):
        if self.render_mode =='rgb_array':
            return self._render_frame()

(11)分别定义方法save_frame_history(self) 和方法_get_frame_history(self, history=4),这两个方法用于保存和获取环境的历史帧,通常用于可视化和回放。具体实现代码如下所示。

    def _save_frame_history(self):
        self.frame_history.append(self._get_obs())

    def _get_frame_history(self, history=4):
        if len(self.frame_history) < history:
            return None
        return list(self.frame_history)[-history:]

(12)定义方法step(),用于模拟智能体与环境之间的交互过程的一步,接收捕食者和猎物的动作,更新环境状态,并返回新的观察值、奖励、是否结束以及其他信息。具体实现代码如下所示。

    def step(self, action_pred, action_prey, pred_communication=None, prey_communication=None):

        if self._is_done():
            raise RuntimeError("Episode is done")
        self.steps += 1
        # move predator
        for i in range(self.predator_num):
            if i < len(action_pred):
                action = action_pred[i]
            else:
                action = self.single_action_space.sample()
            new_location = self._predator_location[i] + self._action_to_direction[action]
            if self._is_valid_predator(new_location, i):
                self._predator_location[i] = new_location

        # move prey
        for i in range(self.prey_num):
            if self.active_prey[i] == False:  # if prey is dead,
                continue
            if i < len(action_prey):
                action = action_prey[i]
            else:
                action = self.single_action_space.sample()
            
            new_location = self._prey_location[i] + self._action_to_direction[action]
            if self._is_valid_prey(new_location, i):
                self._prey_location[i] = new_location

        # check if any predator reaches prey and give reward
        pred_reward = self._get_reward()
        prey_reward = self._get_prey_reward()
        for i in range(self.predator_num):
            for j in range (self.prey_num):
                if np.all(self._predator_location[i] == self._prey_location[j]):
                    self.active_predator[i] = False
        
        #save communication of agents
        if self.communication_bits > 0:
            self.pred_communication = pred_communication
            self.prey_communication = prey_communication

        done = self._is_done()
        reward = {
            'predator': pred_reward,
            'prey': prey_reward
        }
        if self.render_mode == 'human':
            self._render_frame()
        self._save_frame_history()
        return self._get_frame_history(self.history_length), reward, done, self._get_info()

上述代码的实现流程如下所示:

  1. 接收智能体的动作:step() 方法接收智能体的动作作为输入参数。这些动作决定了智能体在当前时间步骤要采取的行动。
  2. 更新环境状态:根据智能体的动作,step() 方法会更新环境的内部状态,包括智能体的位置、奖励分配等等。这个更新可能包括移动智能体、改变环境状态等。
  3. 计算奖励:step() 方法会根据当前的环境状态和智能体的动作计算奖励。奖励表示智能体在执行该动作后的表现好坏,通常是一个数值。
  4. 判断是否结束:方法还会检查当前回合是否结束,可能的结束条件包括达到最大步数、任务成功完成或失败等。
  5. 返回结果:step() 方法会返回一个包含以下信息的元组:新的观察值(环境状态)、奖励、是否结束、其他信息。这些信息通常被用于智能体的学习和决策。

总之,step()方法是智能体与环境互动的核心,它模拟了一个时间步骤中的所有操作,允许智能体与环境进行连续的交互,以便智能体能够学习并改进其策略以达到某个目标。

(13)分别定义方法_render_predator_frame(self, predator_id:int=0) 和方法_render_prey_frame(self, prey_id:int=1),这两个方法用于渲染捕食者和猎物的图像帧,根据传入的捕食者或猎物的ID,在图像中绘制相应的位置信息。具体实现代码如下所示。        

    def _render_predator_frame(self, predator_id:int=0):

        if predator_id==None:
            return
        frame = np.zeros((4, self.vision, self.vision), dtype=np.uint8)
        # draw predator
        pred_loc = self._predator_location[predator_id]
        min_pred_loc = pred_loc - np.array([self.vision//2, self.vision//2])
        max_pred_loc = pred_loc + np.array([self.vision//2, self.vision//2])

        # add predator to centre of frame
        frame[1, self.vision//2, self.vision//2] = self.active_predator[predator_id]

        # for each predator or prey within min and max it will be added in the frame
        for i in range(self.predator_num):
            if i==predator_id:
                continue
            if (min_pred_loc[0] <= self._predator_location[i][0] <= max_pred_loc[0] 
            and 
            min_pred_loc[1] <= self._predator_location[i][1] <= max_pred_loc[1]):
                loc_x = self._predator_location[i][0]-min_pred_loc[0]
                loc_y = self._predator_location[i][1]-min_pred_loc[1]
#                 frame[2, loc_x, loc_y] = self.render_scale 
                frame[2, loc_x, loc_y] = int(self.active_predator[i]) 
                # frame[2, loc_x, loc_y] = self.pred_communication[i]
                if self.communication_bits > 0:
                    frame[3, loc_x, loc_y] = self.pred_communication[i]
        
        
        for i in range(self.prey_num):
            if (min_pred_loc[0] <= self._prey_location[i][0] <= max_pred_loc[0] 
            and 
            min_pred_loc[1] <= self._prey_location[i][1] <= max_pred_loc[1]):
                loc_x = self._prey_location[i][0]-min_pred_loc[0]
                loc_y = self._prey_location[i][1]-min_pred_loc[1]
                frame[0, loc_x, loc_y] = self.render_scale 
                
        # create white for cells outside grid
        if min_pred_loc[0] < 0:
            frame[:, :abs(min_pred_loc[0]), :] = self.render_scale 
        if max_pred_loc[0] >= self.size:
            frame[:, -(max_pred_loc[0]-self.size+1):, :] = self.render_scale 
        if min_pred_loc[1] < 0:
            frame[:, :, :abs(min_pred_loc[1])] = self.render_scale 
        if max_pred_loc[1] >= self.size:
            frame[:, :, -(max_pred_loc[1]-self.size+1):] = self.render_scale 
        
        return frame

    def _render_prey_frame(self, prey_id:int=1):
        if prey_id==None:
            return
        frame = np.zeros((3, self.vision, self.vision), dtype=np.uint8)
        # draw prey
        prey_loc = self._prey_location[prey_id]
        min_prey_loc = prey_loc - np.array([self.vision//2, self.vision//2])
        max_prey_loc = prey_loc + np.array([self.vision//2, self.vision//2])

        # add prey to centre of frame
        frame[1, self.vision//2, self.vision//2] = self.render_scale 
        # for each predator or prey within min and max it will be added in the frame
        for i in range(self.predator_num):
            if (min_prey_loc[0] <= self._predator_location[i][0] <= max_prey_loc[0] 
            and 
            min_prey_loc[1] <= self._predator_location[i][1] <= max_prey_loc[1]):
                frame[2, self._predator_location[i][0]-min_prey_loc[0], self._predator_location[i][1]-min_prey_loc[1]] = self.render_scale 
        
        for i in range(self.prey_num):
            if (min_prey_loc[0] <= self._prey_location[i][0] <= max_prey_loc[0] 
            and 
            min_prey_loc[1] <= self._prey_location[i][1] <= max_prey_loc[1]):
                frame[0, self._prey_location[i][0]-min_prey_loc[0], self._prey_location[i][1]-min_prey_loc[1]] = self.render_scale 
        
        # create white for cells outside grid
        if min_prey_loc[0] < 0:
            frame[:, :abs(min_prey_loc[0]), :] = self.render_scale 
        if max_prey_loc[0] >= self.size:
            frame[:, -(max_prey_loc[0]-self.size+1):, :] = self.render_scale 
        if min_prey_loc[1] < 0:
            frame[:, :, :abs(min_prey_loc[1])] = self.render_scale 
        if max_prey_loc[1] >= self.size:
            frame[:, :, -(max_prey_loc[1]-self.size+1):] = self.render_scale 
        
        return frame

(14)定义方法_render_frame(self),用于渲染整个环境的图像帧,包括网格、捕食者和猎物的位置。具体实现代码如下所示。  

    def _render_frame(self):

        if self.window is None and self.render_mode == 'human':
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode((self.window_size, self.window_size))
            self.window = pygame.display.set_mode((self.window_size, self.window_size))
        if self.clock is None and self.render_mode == 'human':
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pixel_size = self.window_size // self.size

        # draw grid
        for i in range(self.size):
            pygame.draw.line(canvas, (0, 0, 0), (0, i*pixel_size), (self.window_size, i*pixel_size))
            pygame.draw.line(canvas, (0, 0, 0), (i*pixel_size, 0), (i*pixel_size, self.window_size))
        
        # draw prey as rectangle
        for i in range(self.prey_num):
            if self.active_prey[i]:
                pygame.draw.rect(canvas, (255, 0, 0), (self._prey_location[i][1]*pixel_size, self._prey_location[i][0]*pixel_size, pixel_size, pixel_size))

        # draw predator as circle
        for i in range(self.predator_num):
            if self.active_predator[i]:
                pygame.draw.circle(canvas, (0, 0, 255), (self._predator_location[i][1]*pixel_size+pixel_size//2, self._predator_location[i][0]*pixel_size+pixel_size//2), pixel_size//2)
        
        
        if self.render_mode == 'human':
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            self.clock.tick(self.metadata['render-fps'])
        else:
            return np.transpose(pygame.surfarray.array3d(canvas), (1, 0, 2))

(15)定义方法close(self),用于关闭渲染窗口,释放资源。具体实现代码如下所示。文章来源地址https://www.toymoban.com/news/detail-861248.html

    def close(self):
        if self.window is not None:
            pygame.quit()
            self.window = None
            self.clock = None

未完待续

到了这里,关于(16-3)多智能体强化学习实战:Predator-Prey游戏(3)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 游戏中的AI与游戏可玩性:基于强化学习的游戏AI

    作者:禅与计算机程序设计艺术 游戏领域近年来由于人工智能的兴起而得到了极大的关注,其中最具代表性的就是游戏AI(Artificial Intelligence for Game)。AI在游戏中是一个与生俱来的特性,它赋予了游戏不同的灵活性、活跃性和新意。比如在任天堂的超级马里奥或是3D游戏《

    2024年02月08日
    浏览(22)
  • 【强化学习】----训练Flappy Bird小游戏

    1.1 游戏简介 Flappy Bird游戏需要玩家控制一只小鸟越过管道障碍物。玩家只可以进行“跳跃”或者“不操作”两种操作,即点或不点。点则让小鸟上升一段距离,不点小鸟继续下降。若小鸟碰到障碍物或地面,则游戏失败。 如今,深度学习通过组合低层特征形成更加抽象的高

    2024年02月11日
    浏览(49)
  • AI强化学习初探——卡牌游戏星际争霸II

    作者:禅与计算机程序设计艺术 在AI技术领域,强化学习(Reinforcement Learning)是最具代表性的一种机器学习方法。其核心思想是通过反馈机制让智能体(Agent)不断修正策略,使得它不断地按照既定目标策略进行行动,以达到最优状态的优化。常见的强化学习算法如Q-learnin

    2024年02月07日
    浏览(22)
  • 人工智能课程笔记(7)强化学习(基本概念 Q学习 深度强化学习 附有大量例题)

    强化学习和深度学习都是机器学习的分支,但是两者在方法和应用场景上有所不同。 强化学习 : 强化学习概述 :强化学习是一种通过智能体与环境进行交互来学习最优行动策略的算法。在强化学习中,智能体与环境不断交互,观察环境的状态并采取不同的行动,从而获得奖

    2024年01月17日
    浏览(32)
  • 【强化学习入门】二.强化学习的基本概念:状态、动作、智能体、策略、奖励、状态转移、轨迹、回报、价值函数

    超级玛丽游戏中,观测到的这一帧画面就是一个 状态(State) 。 玛丽做的动作:向左、向右、向上即为 动作(Action) 。 动作是由谁做的,谁就是 智能体(Agent) 。自动驾驶中,汽车就是智能体;机器人控制中,机器人就是智能体;超级玛丽游戏中,玛丽就是智能体。 策略

    2024年02月03日
    浏览(22)
  • 强化学习代码实战(3) --- 寻找真我

          本文内容来自于南京大学郭宪老师在博文视点学院录制的视频,课程仅9元地址,配套书籍为深入浅出强化学习 编程实战 郭宪地址。       我们发现多臂赌博机执行一个动作之后,无论是选择摇臂1,摇臂2,还是摇臂3之后都会返回原来的状态,也就是说它的状态并没

    2024年02月08日
    浏览(26)
  • 【人工智能】— 学习与机器学习、无/有监督学习、强化学习、学习表示

    贝叶斯网络提供了一个自然的表示方式,用于描述(因果引起的)条件独立性。 拓扑结构 + 条件概率表 = 联合分布的紧凑表示。 通常易于领域专家构建。 通过变量消除进行精确推断: 在有向无环图上的时间复杂度是多项式级别的,但在一般图上为 NP-hard。 空间复杂度与时间

    2024年02月07日
    浏览(46)
  • 【Python】强化学习:原理与Python实战

    搞懂大模型的智能基因,RLHF系统设计关键问答   RLHF(Reinforcement Learning with Human Feedback,人类反馈强化学习)虽是热门概念,并非包治百病的万用仙丹。本问答探讨RLHF的适用范围、优缺点和可能遇到的问题,供RLHF系统设计者参考。 📕作者简介: 热爱跑步的恒川 ,致力于

    2024年02月11日
    浏览(22)
  • 通用人工智能技术(深度学习,大模型,Chatgpt,多模态,强化学习,具身智能)

    目录 前言 1.通用人工智能 1.1 生物学分析 1.2具身智能 1.2.1当前的人工智能的局限 1.2.2 具身智能实现的基础 1.2.3 强化学习(决策大模型) 2.结论 往期文章 参考文献       目前的人工智能实质上只是强人工智能,或者说单个领域的通用人工智能。比方说Chatgpt它属于自然语言

    2024年02月07日
    浏览(43)
  • 通用人工智能之路:什么是强化学习?如何结合深度学习?

    2015年, OpenAI 由马斯克、美国创业孵化器Y Combinator总裁阿尔特曼、全球在线支付平台PayPal联合创始人彼得·蒂尔等硅谷科技大亨创立,公司核心宗旨在于 实现安全的通用人工智能(AGI) ,使其有益于人类。 ChatGPT 是 OpenAI 推出的一个基于对话的原型 AI 聊天机器人,2022年12 月 1

    2024年02月16日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包