跳转至

构建服务器

构建服务器

如果您想让您的账号在社区上充当一个机器人的角色,那么server模块就是专门为你而开发。
icodeapi提供了server模块以构建有道小图灵的账号机器人,可以做到监视某些数据并执行回调函数。

该模块在1.0.3版本被加入。

登录

使用server.IcodeServer类构建一个服务器。

class IcodeServer:
    '''
    Build your icode bot by AsyncIcodeAPI

    learn more:https://xbzstudio.github.io/icodeapi/docs/Server.html/
    '''
    __api : AsyncIcodeAPI
    __events : list[Awaitable]
    __tocomment : list[Awaitable]
    __tolike : list[Awaitable]
    __lastData : Any
    __running : bool
    debug : bool
    ban : set[str]
    results : list[list[list]]

    def __init__(self, api : AsyncIcodeAPI, ban : set[str] = set(), debug : bool = True) -> None:
        self.__api = api
        self.__events = []
        self.__tocomment = []
        self.__tolike = []
        self.__running = False
        self.ban = ban
        self.debug = debug
        self.results = []
        self.__lastData = {}

__api : 服务器发出请求使用的AsyncIcodeAPI对象
__events : 服务器需要监听的事件列表
__tocomment : 服务器需要进行执行的评论操作列表
__tolike : 服务器需要进行执行的点赞操作列表
__lastData : 服务器上一次获取的数据(每个监听事件)
__running : 服务器是否正在运行
ban : 服务器操作黑名单集合
debug : 服务器是否开启调试模式
results : 服务器运行结果列表

__events, __tocomment, __toloke, __lastData__running这几个私有属性是及其低级的属性,请使用更高级的api,而不是直接使用它们。
示例

from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式

@SERVER.CheckMessage() # 注册新的监听事件
def checkmessage(content : str) -> None:
    print('New Message: ', content)

server.RunServer(server = SERVER) # 运行服务器
这段代码会监测任何消息中心中的新消息,一旦有新消息就回调checkmessage
注意:
在运行这段示例代码时,如果用相同的账号给自己评论,不会输出任何内容。因为IcodeServer自动将自己加入黑名单。

运行服务器

如果想要开始监听所有已注册事件,可以使用server.RunServer函数。

def RunServer(server : IcodeServer) -> None

server : 需要运行的服务器对象

server.IcodeServer中也提供了一个异步的运行方法。

async def run(self)
但在运行前,需要对服务器对象进行登录,在运行结束后,需要关闭服务器。而server.RunServer会自动执行这些操作,这也是serevr.RunServer的便捷之处。
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
import asyncio
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭服务器调试模式

@SERVER.CheckMessage() # 注册新的监听事件
def checkmessage(content : str) -> None:
    print('New Message: ', content)

async def main(Server : server.IcodeServer) -> None: # 主程序
    await Server.login() # 登录
    await Server.run() # 开始监听
    await Server.closeBot() # 关闭服务器

asyncio.run(main(SERVER)) # 主程序最高级入口

停止服务器运行

在运行服务器后,服务器对象的__running属性将为True,如果想要停止服务器运行,可以调用stopRunning方法。在调用后服务器不会立即停止,而是完成最后一次迭代后退出运行状态。

def stopRunning(self)
也可以在终端使用ctrl + c快捷键(Windows操作系统)终止服务器运行。
在服务器结束运行后可以使用results属性查看运行的总结果。

注册监听事件

在服务器运行前可以使用装饰器注册需要监听的事件。
监听消息中心
使用server.IcodeServer.CheckMessage方法注册新的消息中心监听事件。

def CheckMessage(self,
                workList : list[str] | None = None,
                checkFunc : Callable[[str, str], bool] = (lambda x, y : True),
                autoRead : bool = True,
                checkRead : bool = True,
                checkNum : int = 30,
                checkSame : bool = True,
                name : Any = None) -> Callable

workList : 只监测在这个列表中的作品的消息,如果为None则不进行过滤
checkFunc : 消息检测函数,返回真则进行处理,否则不进行处理。方法内部会给这个函数传入两个参数,第一个为消息内容,第二个为消息发送者的userId
autoRead : 是否在处理完检测到的消息后自动将其设为已读
checkRead : 是否只检测未读的消息
checkNum : 检测消息的条数,如果超过这个数值则不进行处理
checkSame : 是否检测与上一次检测到的消息相同的消息
name : 给这个监听事件命名,方便调试

示例:

from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式

@SERVER.CheckMessage(checkFunc = (lambda x, y : x == '你真帅'), 
                     checkNum = 50) # 注册新的监听事件
def checkmessage(userId : str) -> None:
    print('又有一个人夸你帅,这个人是', userId)

server.RunServer(server = SERVER) # 运行服务器
装饰器传入的回调函数可以包含四个参数,server.IcodeServer.CheckMessage会自动读取参数列表并传入对应信息:

content : 检测到的信息内容
userId : 检测到的信息发送者的userId
api : server.IcodeServer__api属性
bot : 服务器对象本身

新的监听事件注册完毕后,CheckMessage生成的新的监听函数一般会返回:

[
# First call
    {
        "code": 0,
        "msg": "success",
        "user": {
            "userId": "xxx",
            "name": "xxx",
            "image": "xxx",
            "mobile": "xxx"
        }
    }, # Function's return
# Second call
    {
        "code": 0,
        "msg": "success",
        "user": {
            "userId": "xxx",
            "name": "xxx",
            "image": "xxx",
            "mobile": "xxx"
        }
    }, # Function's return
    <function object>, # Function Object
    'Enshrine work1' # Function's name
]
包含所有消息的回调函数的返回值,和CheckMessage方法本身以及CheckMessage在注册时传入的name参数。
该数据会加入server.IcodeServerresults属性。

监听单个作品信息
使用server.IcodeServer.CheckWork注册单个作品数据的监听事件。

def CheckWork(self, workId : str,
            addBrowseNum : bool = False,
            checkFunc : Callable | None = (lambda **kwargs : bool(kwargs)),
            checkSame : bool = True,
            name : str = None) -> Callable[[Callable], Awaitable]

workId : 作品id
addBrowseNum : 在监视数据时是否增加浏览量(AsyncIcodeAPI.getWorkDetail的缘由)
checkFunc : 检测函数,当返回真时才会触发回调函数。方法内部会按照装饰器传入的函数的参数给这个参数传入同样的参数。
checkSame : 是否检测与上一次检测到相同的数据。
name : 给这个监听事件命名,方便调试

装饰器传入的回调函数可以包含6个参数,如下所示:

detail : 使用AsyncIcodeAPI.getWorkDetail获取到该作品的信息
submitInfo : 使用AsyncIcodeAPI.getWorkSubmitInfo获取到该作品的提交信息
comments : 使用AsyncIcodeAPI.getWorkComments获取到该作品的评论信息
moreWorks : 使用AsyncIcodeAPI.getMoreWorks获取到该作品的更多作品信息
api : 该服务器对象的__api属性
bot : 该服务器对象本身

示例:

from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式

@SERVER.CheckWork('a1f09b5eb34a48dfbdc8dee59d130ec6', # 注册监听事件
                  checkFunc = (lambda **info : info.get('detail').get('likeNum')>16)) # 检测点赞量
def stop(detail : dict, bot : server.IcodeServer):
    print('16赞达成')
    bot.stopRunning() # 停止服务器运行

server.RunServer(server = SERVER) # 运行服务器
该方法的返回值和server.IcodeServer.CheckMessage大致相同,也会加入server.IcodeServer.results

监听单个用户信息
使用server.IcodeServer.CheckPerson注册单个用户的监听事件。

def CheckPerson(self, userId : str,
                checkFunc : Callable | None = (lambda **kwargs : True),
                checkSame : bool = True,
                name : str | None = None) -> Callable[[Callable], Awaitable]

userId : 用户id
checkFunc : 检测函数,当返回真时才会触发回调函数。方法内部会按照装饰器传入的函数的参数给这个参数传入同样的参数。
checkSame : 是否检测与上一次检测到相同信息。
name : 给这个监听事件命名,方便调试

装饰器传入的回调函数可以包含5个参数,如下所示:

info : 使用AsyncIcodeAPI.getPersonInfo获取到该用户的信息
works : 使用AsyncIcodeAPI.getPersonWorks获取到该用户的所有作品信息
enshrines : 使用AsyncIcodeAPI.getPersonEnshrines获取到该用户的收藏作品信息
api : 该服务器对象的__api属性
bot : 该服务器对象本身

示例:

from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式

@SERVER.CheckPerson('qqB60D8500058F9FBEA41D76E17167C37A', # 注册新的监听事件
                    checkFunc = (lambda **info : info.get('info').get('enshrinesNum')>122)) # 检测收藏量
def getInfo(info : dict, bot : server.IcodeServer):
    print('目标达成,目前收藏量:')
    print(info.get('enshrinesNum'))
    bot.stopRunning()  # 结束服务器运行

server.RunServer(server = SERVER) # 运行服务器
该方法的返回值和server.IcodeServer.CheckMessage大致相同,也会加入server.IcodeServer.results

监听作品集
使用server.IcodeServer.CheckWorks监听整个社区的作品集。

def CheckWorks(self, getNum : int, 
                sortType : int = 2, 
                theme : str = 'all', 
                codeLanguage : str = 'all', 
                keyword : Any = '',
                checkFunc : Callable | None = (lambda x : True),
                checkSame : bool = True,
                name : str | None = None) -> Callable[[Callable], Awaitable]

getNum : 获取作品的数量
sortType : 排序方式,1为按点赞量从大到小排序,2则为按发布时间从现在往以前排序。
theme : 作品主题,默认为all,即所有类型。可以为all , play , story , art , minecraft , scratch , turtle。
codeLanguage : 代码语言,默认为all,即所有语言。可以为all , blockly , scratch , python。
keyword : 获取到作品的检索关键词,默认为空。
checkFunc : 检测函数,当返回真时才会触发回调函数。方法内部会按照装饰器传入的函数的参数给这个参数传入同样的参数。
checkSame : 是否检测与上次检测到相同的作品。
name : 给这个监听事件命名,方便调试

装饰器传入的回调函数可包含以下几个参数:

works : AsyncIcodeAPI.getWorks的返回值
api : 服务器对象的__api属性
bot : 该服务器对象本身

取消监听事件

可以使用server.IcodeServer.cancel方法取消一个监听事件。

def cancel(self, func : Callable) -> None
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器

@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, api : AsyncIcodeAPI):
    work = await api.getPersonWorks(userId, 1, 1)
    await api.enshrine(work[0]['id'])

SERVER.cancel(getInfo) # 取消

server.RunServer(server = SERVER) # 运行服务器
在运行服务器后,服务器也不会监听消息中心。

异步的回调函数,评论、点赞阻塞管理和fastReply

当需要进行一些耗时较长的IO操作时,同步的回调函数是不能满足需求的,这时候就需要使用异步的回调函数。
示例:

from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器

@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, api : AsyncIcodeAPI): # 获取AsyncIcodeAPI对象
    work = await api.getPersonWorks(userId, 1, 1) # 获取作品
    await api.enshrine(work[0]['id']) # 收藏

server.RunServer(server = SERVER) # 运行服务器
这段代码可以自动收藏来评论的人的作品。
评论、点赞阻塞管理
当我们在回调函数中使用api参数来进行对某个用户评论或点赞时,会出现风险。因为社区给评论和点赞设置了时间限制,分别为5秒和3秒内只能进行一次评论和点赞操作。
而评论、点赞的操作是异步的,这就代表在一个操作进行完成后不需要等待5或3秒下一个操作就会立即执行(因为它们都已被注册到asyncio事件循环中),这会导致很多操作出现因操作频繁请求被拦截的错误。
为了解决这个问题,server.IcodeServer提供了comment, replylike方法来代替AsyncIcodeAPIcomment, replylike方法。而且这些方法是同步的。
这些方法和AsyncIcodeAPIcomment, replylike方法的调用方式完全一样,参数设置完全相同。
当调用这些方法后,server.IcodeServer将把需要进行的操作放入一个等待序列中,在服务器处于运行中状态时,将由两个内部的协程将它们有序地执行,并且设置阻塞时间,当阻塞时间到达时,将自动执行下一个操作。这样就不会引发因操作频繁请求被拦截的错误。
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器

@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, api : AsyncIcodeAPI, bot : server.IcodeServer):
    work = await api.getPersonWorks(userId, getNum = 1)
    bot.comment(work[0]['id'], 'test') # 调用评论方法

server.RunServer(server = SERVER) # 运行服务器
另外,server.IcodeServer还提供了异步方法fastReply来快速在一个用户的第一个作品下评论一条内容。这样省去了一次获取用户作品的操作,简化了代码。
async def fastReply(self, userId : str, content : str) -> None
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器

@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, bot : server.IcodeServer):
    await bot.fastReply(userId, "hello world") # 快速回复

server.RunServer(server = SERVER) # 运行服务器

服务器池

如果想要多个账户同时监听数据,可以使用服务器池server.ServerPool

class ServerPool:
    __servers : list[IcodeServer]
    __executor : ProcessPoolExecutor

    def __init__(self, *servers : IcodeServer, 
            executor : ProcessPoolExecutor | None = None):
        self.__servers = servers
        self.__executor = executor or ProcessPoolExecutor()

__servers : 一个包含所有服务器对象server.IcodeServer的列表
__executor : 一个ProcessPoolExecutor对象,用于多进程执行服务器对象的任务

当初始化时传入一个服务器对象列表(需要作为可变参数形式)即可初始化服务器池。
server.ServerPool.login会自动将池内的所有服务器对象进行登录。并返回池中所有对象的登录状态。

async def login(self) -> list[bool]
可以使用server.ServerPool.addAPIserver.ServerPoool.addServers方法把多个AsyncIcodeAPI对象或多个IcodeServer对象添加到池中。如果加入的是AsyncIcodeAPI对象,则会自动生成新的IcodeServer对象并将AsyncIcodeAPI对象填入。
apiservers的序列需要作为可变参数形式。
async def addAPI(self, * api : AsyncIcodeAPI)
async def addServers(self, * servers : IcodeServer)
使用server.ServerPool.RunServersserver.ServerPool.stopRunning来运行所有服务器和停止所有服务器。
使用server.ServerPool.closePool来停止所有服务器的运行并关闭服务器池。
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
import asyncio
api = AsyncIcodeAPI(input()) # 实例化用户对象
api2 = AsyncIcodeAPI(input())  # 实例化第二个用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER2 = server.IcodeServer(api = api2) # 实例化第二个服务器

@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, bot : server.IcodeServer):
    await bot.fastReply(userId, "hello world") # 快速回复

@SERVER2.CheckMessage() # 注册新的监听事件
async def getInfo2(userId : str, bot : server.IcodeServer):
    await bot.fastReply(userId, "world hello") # 快速回复

pool = server.ServerPool(*[SERVER, SERVER2]) # 创建池
async def main(pool : server.ServerPool): # 主程序
    await pool.login() # 登录
    await pool.RunServers() # 开始全部运行
    await pool.closePool() # 关闭池
asyncio.run(main(pool))