构建服务器
构建服务器
如果您想让您的账号在社区上充当一个机器人的角色,那么server
模块就是专门为你而开发。
icodeapi提供了server
模块以构建有道小图灵的账号机器人,可以做到监视某些数据并执行回调函数。
该模块在1.0.3版本被加入。
登录
使用server.IcodeServer
类构建一个服务器。
class IcodeServer:
'''
Build your icode bot by AsyncIcodeAPI
learn more:https://xbzstudio.github.io/icodeapi/docs/Server.html/
'''
__api : AsyncIcodeAPI
__events : list[Awaitable]
__tocomment : list[Awaitable]
__tolike : list[Awaitable]
__lastData : Any
__running : bool
debug : bool
ban : set[str]
results : list[list[list]]
def __init__(self, api : AsyncIcodeAPI, ban : set[str] = set(), debug : bool = True) -> None:
self.__api = api
self.__events = []
self.__tocomment = []
self.__tolike = []
self.__running = False
self.ban = ban
self.debug = debug
self.results = []
self.__lastData = {}
__api : 服务器发出请求使用的
AsyncIcodeAPI
对象
__events : 服务器需要监听的事件列表
__tocomment : 服务器需要进行执行的评论操作列表
__tolike : 服务器需要进行执行的点赞操作列表
__lastData : 服务器上一次获取的数据(每个监听事件)
__running : 服务器是否正在运行
ban : 服务器操作黑名单集合
debug : 服务器是否开启调试模式
results : 服务器运行结果列表
__events
, __tocomment
, __toloke
, __lastData
和__running
这几个私有属性是及其低级的属性,请使用更高级的api,而不是直接使用它们。
示例
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式
@SERVER.CheckMessage() # 注册新的监听事件
def checkmessage(content : str) -> None:
print('New Message: ', content)
server.RunServer(server = SERVER) # 运行服务器
checkmessage
。注意:
在运行这段示例代码时,如果用相同的账号给自己评论,不会输出任何内容。因为IcodeServer自动将自己加入黑名单。
运行服务器
如果想要开始监听所有已注册事件,可以使用server.RunServer
函数。
server : 需要运行的服务器对象
在server.IcodeServer
中也提供了一个异步的运行方法。
server.RunServer
会自动执行这些操作,这也是serevr.RunServer
的便捷之处。示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
import asyncio
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭服务器调试模式
@SERVER.CheckMessage() # 注册新的监听事件
def checkmessage(content : str) -> None:
print('New Message: ', content)
async def main(Server : server.IcodeServer) -> None: # 主程序
await Server.login() # 登录
await Server.run() # 开始监听
await Server.closeBot() # 关闭服务器
asyncio.run(main(SERVER)) # 主程序最高级入口
停止服务器运行
在运行服务器后,服务器对象的__running
属性将为True
,如果想要停止服务器运行,可以调用stopRunning
方法。在调用后服务器不会立即停止,而是完成最后一次迭代后退出运行状态。
ctrl + c
快捷键(Windows操作系统)终止服务器运行。在服务器结束运行后可以使用
results
属性查看运行的总结果。
注册监听事件
在服务器运行前可以使用装饰器注册需要监听的事件。
监听消息中心
使用server.IcodeServer.CheckMessage
方法注册新的消息中心监听事件。
def CheckMessage(self,
workList : list[str] | None = None,
checkFunc : Callable[[str, str], bool] = (lambda x, y : True),
autoRead : bool = True,
checkRead : bool = True,
checkNum : int = 30,
checkSame : bool = True,
name : Any = None) -> Callable
workList : 只监测在这个列表中的作品的消息,如果为
None
则不进行过滤
checkFunc : 消息检测函数,返回真则进行处理,否则不进行处理。方法内部会给这个函数传入两个参数,第一个为消息内容,第二个为消息发送者的userId
autoRead : 是否在处理完检测到的消息后自动将其设为已读
checkRead : 是否只检测未读的消息
checkNum : 检测消息的条数,如果超过这个数值则不进行处理
checkSame : 是否检测与上一次检测到的消息相同的消息
name : 给这个监听事件命名,方便调试
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式
@SERVER.CheckMessage(checkFunc = (lambda x, y : x == '你真帅'),
checkNum = 50) # 注册新的监听事件
def checkmessage(userId : str) -> None:
print('又有一个人夸你帅,这个人是', userId)
server.RunServer(server = SERVER) # 运行服务器
server.IcodeServer.CheckMessage
会自动读取参数列表并传入对应信息:
content : 检测到的信息内容
userId : 检测到的信息发送者的userId
api :server.IcodeServer
的__api
属性
bot : 服务器对象本身
新的监听事件注册完毕后,CheckMessage
生成的新的监听函数一般会返回:
[
# First call
{
"code": 0,
"msg": "success",
"user": {
"userId": "xxx",
"name": "xxx",
"image": "xxx",
"mobile": "xxx"
}
}, # Function's return
# Second call
{
"code": 0,
"msg": "success",
"user": {
"userId": "xxx",
"name": "xxx",
"image": "xxx",
"mobile": "xxx"
}
}, # Function's return
<function object>, # Function Object
'Enshrine work1' # Function's name
]
CheckMessage
方法本身以及CheckMessage
在注册时传入的name参数。该数据会加入
server.IcodeServer
的results
属性。
监听单个作品信息
使用server.IcodeServer.CheckWork
注册单个作品数据的监听事件。
def CheckWork(self, workId : str,
addBrowseNum : bool = False,
checkFunc : Callable | None = (lambda **kwargs : bool(kwargs)),
checkSame : bool = True,
name : str = None) -> Callable[[Callable], Awaitable]
workId : 作品id
addBrowseNum : 在监视数据时是否增加浏览量(AsyncIcodeAPI.getWorkDetail
的缘由)
checkFunc : 检测函数,当返回真时才会触发回调函数。方法内部会按照装饰器传入的函数的参数给这个参数传入同样的参数。
checkSame : 是否检测与上一次检测到相同的数据。
name : 给这个监听事件命名,方便调试
装饰器传入的回调函数可以包含6个参数,如下所示:
detail : 使用
AsyncIcodeAPI.getWorkDetail
获取到该作品的信息
submitInfo : 使用AsyncIcodeAPI.getWorkSubmitInfo
获取到该作品的提交信息
comments : 使用AsyncIcodeAPI.getWorkComments
获取到该作品的评论信息
moreWorks : 使用AsyncIcodeAPI.getMoreWorks
获取到该作品的更多作品信息
api : 该服务器对象的__api
属性
bot : 该服务器对象本身
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式
@SERVER.CheckWork('a1f09b5eb34a48dfbdc8dee59d130ec6', # 注册监听事件
checkFunc = (lambda **info : info.get('detail').get('likeNum')>16)) # 检测点赞量
def stop(detail : dict, bot : server.IcodeServer):
print('16赞达成')
bot.stopRunning() # 停止服务器运行
server.RunServer(server = SERVER) # 运行服务器
server.IcodeServer.CheckMessage
大致相同,也会加入server.IcodeServer.results
。
监听单个用户信息
使用server.IcodeServer.CheckPerson
注册单个用户的监听事件。
def CheckPerson(self, userId : str,
checkFunc : Callable | None = (lambda **kwargs : True),
checkSame : bool = True,
name : str | None = None) -> Callable[[Callable], Awaitable]
userId : 用户id
checkFunc : 检测函数,当返回真时才会触发回调函数。方法内部会按照装饰器传入的函数的参数给这个参数传入同样的参数。
checkSame : 是否检测与上一次检测到相同信息。
name : 给这个监听事件命名,方便调试
装饰器传入的回调函数可以包含5个参数,如下所示:
info : 使用
AsyncIcodeAPI.getPersonInfo
获取到该用户的信息
works : 使用AsyncIcodeAPI.getPersonWorks
获取到该用户的所有作品信息
enshrines : 使用AsyncIcodeAPI.getPersonEnshrines
获取到该用户的收藏作品信息
api : 该服务器对象的__api
属性
bot : 该服务器对象本身
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER.debug = False # 关闭debug模式
@SERVER.CheckPerson('qqB60D8500058F9FBEA41D76E17167C37A', # 注册新的监听事件
checkFunc = (lambda **info : info.get('info').get('enshrinesNum')>122)) # 检测收藏量
def getInfo(info : dict, bot : server.IcodeServer):
print('目标达成,目前收藏量:')
print(info.get('enshrinesNum'))
bot.stopRunning() # 结束服务器运行
server.RunServer(server = SERVER) # 运行服务器
server.IcodeServer.CheckMessage
大致相同,也会加入server.IcodeServer.results
。
监听作品集
使用server.IcodeServer.CheckWorks
监听整个社区的作品集。
def CheckWorks(self, getNum : int,
sortType : int = 2,
theme : str = 'all',
codeLanguage : str = 'all',
keyword : Any = '',
checkFunc : Callable | None = (lambda x : True),
checkSame : bool = True,
name : str | None = None) -> Callable[[Callable], Awaitable]
getNum : 获取作品的数量
sortType : 排序方式,1为按点赞量从大到小排序,2则为按发布时间从现在往以前排序。
theme : 作品主题,默认为all
,即所有类型。可以为all , play , story , art , minecraft , scratch , turtle。
codeLanguage : 代码语言,默认为all
,即所有语言。可以为all , blockly , scratch , python。
keyword : 获取到作品的检索关键词,默认为空。
checkFunc : 检测函数,当返回真时才会触发回调函数。方法内部会按照装饰器传入的函数的参数给这个参数传入同样的参数。
checkSame : 是否检测与上次检测到相同的作品。
name : 给这个监听事件命名,方便调试
装饰器传入的回调函数可包含以下几个参数:
works :
AsyncIcodeAPI.getWorks
的返回值
api : 服务器对象的__api
属性
bot : 该服务器对象本身
取消监听事件
可以使用server.IcodeServer.cancel
方法取消一个监听事件。
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, api : AsyncIcodeAPI):
work = await api.getPersonWorks(userId, 1, 1)
await api.enshrine(work[0]['id'])
SERVER.cancel(getInfo) # 取消
server.RunServer(server = SERVER) # 运行服务器
异步的回调函数,评论、点赞阻塞管理和fastReply
当需要进行一些耗时较长的IO操作时,同步的回调函数是不能满足需求的,这时候就需要使用异步的回调函数。
示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, api : AsyncIcodeAPI): # 获取AsyncIcodeAPI对象
work = await api.getPersonWorks(userId, 1, 1) # 获取作品
await api.enshrine(work[0]['id']) # 收藏
server.RunServer(server = SERVER) # 运行服务器
评论、点赞阻塞管理
当我们在回调函数中使用api参数来进行对某个用户评论或点赞时,会出现风险。因为社区给评论和点赞设置了时间限制,分别为5秒和3秒内只能进行一次评论和点赞操作。
而评论、点赞的操作是异步的,这就代表在一个操作进行完成后不需要等待5或3秒下一个操作就会立即执行(因为它们都已被注册到asyncio事件循环中),这会导致很多操作出现因操作频繁请求被拦截的错误。
为了解决这个问题,
server.IcodeServer
提供了comment
, reply
和like
方法来代替AsyncIcodeAPI
的comment
, reply
和like
方法。而且这些方法是同步的。这些方法和
AsyncIcodeAPI
的comment
, reply
和like
方法的调用方式完全一样,参数设置完全相同。当调用这些方法后,
server.IcodeServer
将把需要进行的操作放入一个等待序列中,在服务器处于运行中状态时,将由两个内部的协程将它们有序地执行,并且设置阻塞时间,当阻塞时间到达时,将自动执行下一个操作。这样就不会引发因操作频繁请求被拦截的错误。示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, api : AsyncIcodeAPI, bot : server.IcodeServer):
work = await api.getPersonWorks(userId, getNum = 1)
bot.comment(work[0]['id'], 'test') # 调用评论方法
server.RunServer(server = SERVER) # 运行服务器
server.IcodeServer
还提供了异步方法fastReply
来快速在一个用户的第一个作品下评论一条内容。这样省去了一次获取用户作品的操作,简化了代码。示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
api = AsyncIcodeAPI(input()) # 实例化用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, bot : server.IcodeServer):
await bot.fastReply(userId, "hello world") # 快速回复
server.RunServer(server = SERVER) # 运行服务器
服务器池
如果想要多个账户同时监听数据,可以使用服务器池server.ServerPool
。
class ServerPool:
__servers : list[IcodeServer]
__executor : ProcessPoolExecutor
def __init__(self, *servers : IcodeServer,
executor : ProcessPoolExecutor | None = None):
self.__servers = servers
self.__executor = executor or ProcessPoolExecutor()
__servers : 一个包含所有服务器对象
server.IcodeServer
的列表
__executor : 一个ProcessPoolExecutor
对象,用于多进程执行服务器对象的任务
当初始化时传入一个服务器对象列表(需要作为可变参数形式)即可初始化服务器池。
server.ServerPool.login
会自动将池内的所有服务器对象进行登录。并返回池中所有对象的登录状态。
server.ServerPool.addAPI
和server.ServerPoool.addServers
方法把多个AsyncIcodeAPI
对象或多个IcodeServer
对象添加到池中。如果加入的是AsyncIcodeAPI
对象,则会自动生成新的IcodeServer
对象并将AsyncIcodeAPI
对象填入。api
和servers
的序列需要作为可变参数形式。使用
server.ServerPool.RunServers
和server.ServerPool.stopRunning
来运行所有服务器和停止所有服务器。使用
server.ServerPool.closePool
来停止所有服务器的运行并关闭服务器池。示例:
from icodeapi import server
from icodeapi import AsyncIcodeAPI
import asyncio
api = AsyncIcodeAPI(input()) # 实例化用户对象
api2 = AsyncIcodeAPI(input()) # 实例化第二个用户对象
SERVER = server.IcodeServer(api = api) # 实例化服务器
SERVER2 = server.IcodeServer(api = api2) # 实例化第二个服务器
@SERVER.CheckMessage() # 注册新的监听事件
async def getInfo(userId : str, bot : server.IcodeServer):
await bot.fastReply(userId, "hello world") # 快速回复
@SERVER2.CheckMessage() # 注册新的监听事件
async def getInfo2(userId : str, bot : server.IcodeServer):
await bot.fastReply(userId, "world hello") # 快速回复
pool = server.ServerPool(*[SERVER, SERVER2]) # 创建池
async def main(pool : server.ServerPool): # 主程序
await pool.login() # 登录
await pool.RunServers() # 开始全部运行
await pool.closePool() # 关闭池
asyncio.run(main(pool))