找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 196|回复: 0

[教程] RenPyUtil:ren_communicator进阶

[复制链接]
发表于 2023-11-10 22:55:50 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
本帖最后由 ZYKsslm 于 2024-3-24 02:59 编辑

RenPyUtil:ren_communicator进阶

Github同步项目 -> https://github.com/ZYKsslm/RenPyUtil

这几天又重新构思了一下代码,线程问题在github上的renpy项目提了issue -> https://github.com/renpy/renpy/issues/5105,最终取了一些折中的方案。

源码如下:

[RenPy] 纯文本查看 复制代码
# 此文件提供了一系列基于Ren'Py的功能类,以供Ren'Py开发者调用
# 作者  ZYKsslm
# 仓库  [url=https://github.com/ZYKsslm/RenPyUtil]https://github.com/ZYKsslm/RenPyUtil[/url]
# 声明  该源码使用 MIT 协议开源,但若使用需要在程序中标明作者消息


"""renpy
init -1 python:
"""


from datetime import datetime
import socket
import pickle
import os


class Message(object):
    """消息类,用于定义通信中收发的消息对象。"""

    STRING = "string"
    FILE = "file"
    IMAGE = "image"
    VOICE = "voice"
    VIDEO = "video"

    def __init__(self, message: str, _type="string"):
        """初始化方法。

        Arguments:
            message -- 可能为一个纯字符串的消息,或一个文件的路径。
            _type -- 文件类型。应为`Message.STRING` `Message.IMAGE` `Message.VOICE` `Message.VIDEO` `Message.FILE`其中一项。
        """        
        
        self.type = _type
        self.message = message
        
        if self.type == Message.STRING:
            self.data = self.message.encode("utf-8")
            self.format = None
        else:
            if not os.path.exists(self.message):
                raise Exception(f"找不到该文件:{self.message}")
            
            self.format = os.path.splitext(self.message)[1]
            
            with open(self.message, "rb") as f:
                self.data = f.read()
        
        self.info = pickle.dumps(
            {
                "message": self.message,
                "data": self.data,
                "type": self.type,
                "format": self.format
            }
        )


class Prompt(object):
    """命令类。"""        

    def __init__(self, *prompts):
        prompts = list(prompts)
        for a in range(len(prompts)):
            prompts[a] = prompts[a].encode("utf-8")
        
        # 转换为一个集合
        self.prompts = set(prompts)
        
        
class _HistoryEntry(object):
    def __init__(self, kind, who, what, who_args, what_args, window_args, show_args):
        self.kind = kind
        self.who = who
        self.what = what
        self.what_args = what_args
        self.who_args = who_args
        self.window_args = window_args
        self.show_args = show_args


class RenServer(object):
    """该类为一个服务端类。基于socket进行多线程通信。

    在于子线程中运行的方法中使用renpy某些更新屏幕的函数(如`renpy.say()`等),可能会引发异常。

    在子线程中运行的方法有:
        1. 使用`set_prompt`设定的命令方法。
        2. 使用`set_receive_event` `set_conn_event` `set_disconn_event`设定的事件方法。
        3. 所有进行通信的方法。
    """

    # 支持监听的事件
    EVENT = [
        "PROMPT",   # 命令事件
        "DISCONN",    # 断连事件
        "CONNECT",  # 连接事件
        "RECEIVE",  # 接收事件
    ]

    PROMPT_EVENT = "PROMPT"
    DISCONN_EVENT = "DISCONN"
    CONNECT_EVENT = "CONNECT"
    RECEIVE_EVENT = "RECEIVE"
    

    def __init__(self, max_conn=5, max_data_size=104857600, ip="0.0.0.0", port=8888, history=False, character=None):
        """初始化方法。

        Keyword Arguments:
            max_conn -- 最大连接数。 (default: {5})
            max_data_size -- 接收数据的最大大小。默认为100M。 (default: {104857600})
            port -- 端口号。 (default: {None})
            history -- 接收到的文字消息是否显示在历史记录中。 (default: {False})
            character -- 若`history`参数为True,则该参数应为一个角色对象,用于保存在历史记录中。 (default: {None})
        """            

        self.port = port
        self.ip = ip
        self.max_data_size = max_data_size
        self.max_conn = max_conn
        self.history = history
        self.character = character
        
        self.bind()

        self.client_socket_list = []
        self.has_opened = False
        self.current_prompt = None
        self.received = False
        self.reply = None

        self.prompt_dict = {}
        self.disconn_event = []
        self.receive_event = []
        self.conn_event = []

        self.log = {}

    def set_prompt(self, prompt: str | list | Prompt, func, *args, **kwargs):
        """调用该方法,创建一个命令,当接收到该命令后执行绑定的函数。命令将作为第一个参数,客户端socket将作为第二个参数传入指定函数中。

        不定参数为绑定的函数参数。

        Arguments:
            prompt -- 命令语句。可为一个字符串或一个列表或一个Prompt对象,若为列表,则列表中所有命令都可触发命令。
            func -- 一个函数。
        """ 

        if not isinstance(prompt, Prompt):
            prompt = Prompt(prompt)
        
        self.prompt_dict[prompt] = [func, args, kwargs]

    def set_reply(self, reply: str | Message):
        """调用该方法,指定接收到消息后自动回复的消息。

        Arguments:
            reply -- 要回复的消息。
        """       
        
        if not isinstance(reply, Message):
            reply = Message(reply)

        self.reply = reply

    def set_disconn_event(self, func, *args, **kwargs):
        """调用该方法,指定当断开连接时的行为。断连的主机名称(一个元组)将作为第一个参数传入指定函数中。

        不定参数为绑定的函数参数。

        Arguments:
            func 一个函数
        """            

        self.disconn_event = [func, args, kwargs]

    def set_receive_event(self, func, *args, **kwargs):
        """调用该方法,指定当接受到消息时的行为。接收到的数据将作为第一个参数,客户端socket将作为第二个参数传入指定函数中。

        不定参数为绑定的函数参数。

        Arguments:
            func -- 一个函数。
        """      

        self.receive_event = [func, args, kwargs]

    def set_conn_event(self, func, *args, **kwargs):
        """调用该方法,指定当客户端连接后的行为。客户端socket将作为第一个参数传入指定函数中。

        不定参数为绑定的函数参数。

        Arguments:
            func -- 一个函数。
        """            

        self.conn_event = [func, args, kwargs]
    
    def close(self):
        """调用该方法,关闭服务端。"""

        self.socket.close()     

    def close_all_conn(self):
        """调用该方法,关闭所有连接。"""   

        if self.client_socket_list:
            for i in range(len(self.client_socket_list)):
                socket = self.client_socket_list[i]
                self.close_a_conn(socket)

    def close_a_conn(self, client_socket: socket.socket=None):
        """调用该方法,关闭一个指定socket连接。

        Keyword Arguments:
            socket -- 客户端socket。若该参数不填,则关闭最新的连接。 (default: {None})
        """    

        try:
            if client_socket:
                client_socket.close()
                self.client_socket_list.remove(client_socket)
            else:
                index = len(self.client_socket_list)-1
                socket = self.client_socket_list[index]
                socket.close()
                self.client_socket_list.remove(socket) 
        except:
            pass

    def listen_event(self, event, tip="", prompt: Prompt = None):
        """调用该方法阻塞式监听一个事件,监听到事件后取消阻塞。

        Arguments:
            event -- 事件类型。

        Keyword Arguments:
            prompt -- 若为命令事件,则该参数为一个Prompt对象。 (default: {None})
            tip -- renpy提示界面的内容。 (default: {None})
        """            

        if event == RenServer.PROMPT_EVENT:
            while prompt != self.current_prompt:
                renpy.notify(tip)
                renpy.pause()

        elif event == RenServer.DISCONN_EVENT:
            event_counter = len(self.log)
            while len(self.log) <= event_counter:
                renpy.notify(tip)
                renpy.pause()

        elif event == RenServer.CONNECT_EVENT:
            event_counter = len(self.client_socket_list)
            while len(self.client_socket_list) <= event_counter:
                renpy.notify(tip)
                renpy.pause()

        elif event == RenServer.RECEIVE_EVENT:
            while not self.received:
                renpy.notify(tip)
                renpy.pause()

        else:
            return
    
    def bind(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.socket.bind((self.ip, self.port))
        self.has_opened = True
    
    def run(self):
        """调用该方法,开始监听端口,创建连接线程。"""            
        
        try:
            self.socket.listen(self.max_conn)
        except socket.error:
            self.bind()
            self.socket.listen(self.max_conn)

        renpy.invoke_in_thread(self._accept)

    def reboot(self, log_clear=False):
        """调用该方法将重新开始通信。

        Keyword Arguments:
            log_clear -- 若为True,将清除日志。 (default: {False})
        """            

        self.bind()
        if log_clear:
            self.log.clear()
        self.run()

    def _accept(self):
        """该方法用于创建连接线程,用于类内部使用,不应被调用。"""            

        while True:
            client_socket = self.socket.accept()[0]
    
            self.client_socket_list.append(client_socket)
            renpy.invoke_in_thread(self._receive, client_socket, self.max_data_size)

            if self.conn_event:
                func, args, kwargs = self.conn_event
                renpy.invoke_in_thread(func, client_socket, *args, **kwargs)

    def _receive(self, client_socket: socket.socket, max_data_size):
        """该方法用于接收线程使用,处理接收事件,用于类内部使用,不应被调用。"""
            
        while True:
            self.received = False
            try:
                data = client_socket.recv(max_data_size)
                self.received = True
            except socket.error as err:
                self.log[datetime.now().strftime(r"%Y-%m-%d %H:%M:%S")] = err
                self.client_socket_list.remove(client_socket)

                if self.disconn_event:
                    func, args, kwargs = self.disconn_event
                    renpy.invoke_in_thread(func, client_socket.getpeername(), *args, **kwargs)

                client_socket.close()
                return
            
            try:
                info = pickle.loads(data)
            except pickle.PickleError as e:
                self.log[datetime.now().strftime(r"%Y-%m-%d %H:%M:%S")] = f"接受失败!数据可能超过最大接收大小:{e}"
                continue
            data = info["data"]
            
            for prompt in self.prompt_dict.keys():
                if data in prompt.prompts:
                    func, args, kwargs = self.prompt_dict[prompt]
                    self.current_prompt = prompt
                    renpy.invoke_in_thread(func, data, client_socket, *args, **kwargs)

            if self.reply:
                self.send(client_socket, self.reply)

            if self.receive_event:
                func, args, kwargs = self.receive_event
                renpy.invoke_in_thread(func, info, client_socket, *args, **kwargs)
               
            if self.history and info["type"] == Message.STRING: 
                history_obj = _HistoryEntry(
                    kind="adv",
                    who=self.character.name,
                    what=info["message"],
                    who_args=self.character.who_args,
                    what_args=self.character.what_args,
                    window_args=self.character.window_args,
                    show_args=self.character.show_args
                )
                _history_list.append(history_obj)

    def send(self, client_socket: socket.socket, msg: Message):
        """调用该方法,向指定客户端发送消息。该方法为阻塞方法。

        Arguments:
            client_socket -- 客户端socket。

        Keyword Arguments:
            msg -- 要发送的消息。

        Returns:
            若返回值为True,则发送消息成功;若为False则失败。
        """            
        
        try:
            client_socket.send(msg.info)
        except socket.error as err:
            self.log[datetime.now().strftime(r"%Y-%m-%d %H:%M:%S")] = err
            
            try:
                self.client_socket_list.remove(client_socket)
            except:
                pass

            if self.disconn_event:
                func, args, kwargs = self.disconn_event
                renpy.invoke_in_thread(func, client_socket.getpeername(), *args, **kwargs)

            client_socket.close()
            return False
        else:
            return True

    def __enter__(self):
        # 进入with语句后执行的方法
        # 禁止了一些功能防止造成与线程有关的异常
        # 禁止回滚
        # 禁止自动存档
        # 禁止用户存档
        # 禁止用户跳过
        config.rollback_enabled = False
        config.allow_skipping = False
        config.has_autosave = False
        renpy.block_rollback()
        
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 当退出with语句后恢复禁用的功能
        config.rollback_enabled = True
        config.allow_skipping = True
        config.has_autosave = True
        renpy.block_rollback()


class RenClient(object):
    """该类为一个客户端类。
    
    在于子线程中运行的方法中使用renpy更新屏幕的函数(如`renpy.say()`等),可能会引发异常。

    在子线程中运行的方法有:
        1. 使用`set_prompt`设定的命令方法。
        2. 使用`set_receive_event` `set_conn_event` `set_disconn_event`设定的事件方法。
        3. 所有进行通信的方法。
    """

    # 支持监听的事件
    EVENT = [
        "PROMPT",   # 命令事件
        "DISCONN",    # 断连事件
        "CONNECT",  # 连接事件
        "RECEIVE",  # 接收事件
    ]

    PROMPT_EVENT = "PROMPT"
    DISCONN_EVENT = "DISCONN"
    CONNECT_EVENT = "CONNECT"
    RECEIVE_EVENT = "RECEIVE"

    
    def __init__(self, target_ip, target_port, max_data_size=104857600, history=False, character=None):
        """初始化方法。

        Arguments:
            target_ip -- 服务端IP。
            target_port -- 服务端端口。

        Keyword Arguments:
            max_data_size -- 接收数据的最大大小。默认为100M。 (default: {104857600})
            history -- 接收到的文字消息是否显示在历史记录中。 (default: {False})
            character -- 若`history`参数为True,则该参数应为一个角色对象,用于保存在历史记录中。 (default: {None})
        """                       

        self.target_ip = target_ip
        self.target_port = target_port
        self.max_data_size = max_data_size
        self.history = history
        self.character = character
        
        self.bind()

        self.is_conn = False
        self.has_opened = False
        self.received = False
        self.current_prompt = None
        self.reply = None

        self.prompt_dict = {}
        self.disconn_event = []
        self.receive_event = []
        self.conn_event = []

        self.log = {}

    def set_prompt(self, prompt: str | set, func, *args, **kwargs):
        """调用该方法,创建一个命令,当服务端发送该命令后执行绑定的函数。命令将作为第一个参数传入指定函数中。

        不定关键字参数为函数参数。

        Arguments:
            prompt -- 命令语句。可为一个字符串或一个集合,若为集合,则集合中所有语句都可触发命令。
            func -- 一个函数。
        """ 

        if not isinstance(prompt, Prompt):
            prompt = Prompt(prompt)
        
        self.prompt_dict[prompt] = [func, args, kwargs]

    def set_reply(self, reply: str | Message):
        """调用该方法,指定接收到消息后自动回复的消息。

        Arguments:
            reply -- 要回复的消息。
        """       
        
        if not isinstance(reply, Message):
            reply = Message(reply)

        self.reply = reply

    def set_disconn_event(self, func, *args, **kwargs):
        """调用该方法,指定当断开连接时的行为。

        不定参数为绑定的函数参数。

        Arguments:
            func 一个函数
        """            

        self.disconn_event = [func, args, kwargs]

    def set_receive_event(self, func, *args, **kwargs):
        """调用该方法,指定当接受到消息时的行为。接收到的数据将作为第一个参数传入指定函数中。

        不定关键字参数为函数参数。

        Arguments:
            func -- 一个函数。
        """            

        self.receive_event = [func, args, kwargs]

    def set_conn_event(self, func, *args, **kwargs):
        """调用该方法,指定成功连接服务端后的行为。

        不定关键字参数为函数参数。

        Arguments:
            func -- 一个函数。
        """            

        self.conn_event = [func, args, kwargs]

    def listen_event(self, event, tip="", prompt=None):
        """调用该方法阻塞式监听一个事件,监听到事件后取消阻塞。

        Arguments:
            event -- 事件类型。

        Keyword Arguments:
            prompt -- 若为命令事件,则该参数为一个Prompt对象。 (default: {None})
            tip -- renpy提示界面的内容。 (default: {None})
        """            

        if event == RenClient.PROMPT_EVENT:
            while prompt != self.current_prompt:
                renpy.notify(tip)
                renpy.pause()

        elif event == RenClient.DISCONN_EVENT:
            event_counter = len(self.log)
            while len(self.log) <= event_counter:
                renpy.notify(tip)
                renpy.pause()

        elif event == RenClient.CONNECT_EVENT:
            while not self.is_conn:
                renpy.notify(tip)
                renpy.pause()

        elif event == RenClient.RECEIVE_EVENT:
            while not self.received:
                renpy.notify(tip)
                renpy.pause()

        else:
            return
        
    def bind(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.has_opened = True

    def run(self):
        """调用该方法,开始尝试连接服务端。"""            
        
        renpy.invoke_in_thread(self._connect)

    def close(self):
        """调用该方法,关闭客户端。"""

        self.socket.close()   

    def reboot(self, log_clear=False):
        """调用该函数重新开始通信。"""

        self.close()
        if log_clear:
            self.log.clear()
        self.run()  
    
    def reconn(self):
        """调用该方法,尝试重新连接。"""
        
        self.socket.close()
        self.bind()
        self._connect()        

    def _connect(self):
        """该方法用于创建连接线程,用于类内部使用,不应被调用。"""

        while True:
            try:
                self.socket.connect((self.target_ip, self.target_port))
                self.is_conn = True
            except socket.error:
                continue
            else:
                break

        if self.conn_event:
            func, args, kwargs = self.conn_event
            renpy.invoke_in_thread(func, *args, **kwargs)
        
        self._receive()

    def _receive(self):
        """该方法用于接收线程使用,处理接收事件,用于类内部使用,不应被调用。"""
        
        while True:
            self.received = False
            try:
                data = self.socket.recv(self.max_data_size)
                self.received = True
            except socket.error as err:
                self.received = False
                self.log[datetime.now().strftime(r"%Y-%m-%d %H:%M:%S")] = err
                self.is_conn = False

                if self.disconn_event:
                    func, args, kwargs = self.disconn_event
                    renpy.invoke_in_thread(func, *args, **kwargs)

                self.reconn()
                return
            
            try:
                info = pickle.loads(data)
            except pickle.PickleError as e:
                self.log[datetime.now().strftime(r"%Y-%m-%d %H:%M:%S")] = f"接受失败!数据可能超过最大接收大小:{e}"
                continue
            data = info["data"]

            for prompt in self.prompt_dict.keys():
                if data in prompt.prompts:
                    func, args, kwargs = self.prompt_dict[prompt]
                    self.current_prompt = prompt
                    renpy.invoke_in_thread(func, data, *args, **kwargs)

            if self.reply:
                self.send(self.reply)

            if self.receive_event:
                func, args, kwargs = self.receive_event
                renpy.invoke_in_thread(func, info, *args, **kwargs)     
                
            if self.history and info["type"] == Message.STRING: 
                history_obj = _HistoryEntry(
                    kind="adv",
                    who=self.character.name,
                    what=info["message"],
                    who_args=self.character.who_args,
                    what_args=self.character.what_args,
                    window_args=self.character.window_args,
                    show_args=self.character.show_args
                )
                _history_list.append(history_obj)        

    def send(self, msg: Message):
        """调用该方法,向指定客户端发送消息。该方法为阻塞方法。

        Arguments:
            msg -- 要发送的消息。

        Returns:
            若为True则发送成功;若为False则发送失败。
        """                     
        
        try:
            self.socket.send(msg.info)
        except socket.error as err:
            self.log[datetime.now().strftime(r"%Y-%m-%d %H:%M:%S")] = err

            if self.disconn_event:
                func, args, kwargs = self.disconn_event
                renpy.invoke_in_thread(func, *args, **kwargs)

            return False
        else:
            return True

    def __enter__(self):
        config.rollback_enabled = False
        config.allow_skipping = False
        config.has_autosave = False
        renpy.block_rollback()
        
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        config.rollback_enabled = True
        config.allow_skipping = True
        config.has_autosave = True
        renpy.block_rollback()



注意:现在的代码应保存在ren_communicator_ren.py文件中!

下面来谈谈一些要点:
以下,我把RenServer和RenClient统称为RenCommunicator


1. 如果你在一个运行于子线程中的方法中(比如使用set_prompt定义的命令事件方法)使用renpy的一些更新屏幕的语句(比如renpy.say(),renpy.call_screen()等),可能会引发异常。原因是那些语句只被允许运行于主线程当中。
2. 在定义一个RenCommunicator对象或使用其通信方法(如send(),run()等)请务必使用python中的with语句,这样可以防止用户回滚游戏导致重复创建启动线程或重复发送消息引发严重异常
3. 我在RenCommunictor中使用了event(事件)机制,可以通过监听这些事件或绑定函数来实现一些功能。RenCommunicator.EVENT 是一个列表,里面是目前支持的事件。
4. 使用RenCommunicator的 listen_event 方法来阻塞式监听一个事件,但是它不会让游戏卡死(出现未响应的情况),当事件发生后将取消阻塞,即继续执行后面的代码。
5. 同时,我还写了一个 Prompt 类,它是一个命令类。当你使用RenCommunictor对象的set_prompt或listen_event方法来创建一个命令或监听一个命令事件时,你应将Prompt对象传入。
6. 最后一点就是关于数据的收发问题了。发送的内容应始终为一个Message对象。Message类能够使用pickle模块将各种数据打包传输。Message类包含四种枚举类型:STRING、IMAGE、MOVIE、VOICE、FILE,表示传输的数据的类型。
7. 新增功能:自动将接收到的文字消息添加到历史记录中。若在定义一个RenCommunicator对象时history参数为True,则character参数应为一个角色对象,用于提取在历史记录中显示的样式。



暂时还没想到其他问题,有问题欢迎随时下方留言(有时可能会注意不到),也可以发邮箱。





您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|RenPy中文空间 ( 苏ICP备17067825号|苏公网安备 32092302000068号 )

GMT+8, 2024-5-15 00:53 , Processed in 0.079527 second(s), 24 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表