Wolfram QQ 机器人

很久以前就有写一个这玩意的想法了,前一阵子了解到了 Wolfram 搞了一个面向开发者的 Wolfram Engine,于是打算搞一个基于这个的 QQBot。 因为 Wolfram Engine 部署的位置和系统等因素,不知为何 Python 的 Wolfram Client 接口一计算就会报错,于是只能用简单粗暴的使用 提取命令 -> 创建 .wl 脚本 -> 解释运行并获取结果 这种十分不优雅的解决方法了。 仍然依赖 Coolq HTTP API 下面是代码

    import re
    import os
    import json
    import html
    import time
    import urllib
    import random
    import requests
    import websocket
    import threading
    import subprocess
    
    qbot_api_event_url="ws://127.0.0.1:6700/event/"
    qbot_api_post_url="http://127.0.0.1:5700/"
    
    MasterID = ''
    calc_index = 0
    
    def shrink(string, length=300):
       string = str(string)
       length_s = len(string)
       if length_s < length:
           return string
       else:
           return '{} <{} more characters> {}'.format(string[:50], length_s - length + 100, string[-50:])
    
    def qbot_send_msg(user_id, message):
        print("\n<send_private_msg>")
        print("posting private message")    
        print(message)
        post = qbot_api_post_url + "send_msg"
        api_payload = {
            'user_id': user_id,
            'message': message
        }
        res = requests.post(post, data=api_payload)
        print(res)
        print('</send_private_msg>\n')
    
    def qbot_send_gmsg(group_id, message):
        print("\n<send_group_msg>")
        print("posting group message")    
        print(message)
        post = qbot_api_post_url + "send_group_msg"
        api_payload = {
            'group_id': group_id,
            'message': message
        }
        res = requests.post(post, data=api_payload)
        print(res)
        print('</send_group_msg>\n')
    
    class ext_calc:
        def __init__(self, expr):
            self.status = 1
            start_index = expr.find('calc ')
            words = ['File', 'Directory', 'System', 'Archive', 'Import', 'Export', 'Get', 'Put', 'Read', 'Save', 'Read', 'Write', 'Open', 'Close', 'Create', 'Run', 'Remote', 'External', 'Install', 'Library', 'Service', 'Send', 'Process', 'Load', 'Database', 'SQL', 'Mongo', 'Device']
            for word in words:
                if word in expr:
                    self.status = -1
                    print(word)
                    self.result = '噫,你想对咱做什么!(请勿使用任何与计算无关的函数!)'
            expr = expr[start_index+10:]
            expr = html.unescape(expr)
            exprT = 'TimeConstrained[' + expr +', 15]'
            print(exprT)
            self.expr = exprT
    
        def getret(self):
            if self.status == -1:
                return
            global calc_index
            calc_index = calc_index + 1
            file_name = "tmpfile" + str(calc_index) + '.wl'
            cmd = 'wolframscript -f ' + file_name + ' -print -charset UTF8'
            with open(file_name, 'w') as f:
               f.write(self.expr)
            print(cmd)
            result = subprocess.getoutput(cmd)
            if 'Invalid syntax' in result:
                self.result = '语法错误!'
                return
            elif 'Failed' in result:
                self.result = '其他错误!'
                return 
            if len(result) == 0:
                self.result = '咱还计算不出来你写的东西呢'
                return
            self.result = result.replace('\n','').replace(';','')
            os.system("rm tmpfile*")
    
    class qbot_event:
        def __init__(self, source_json):
            print("\n<process_event>\n")
            event_json = json.loads(source_json)
            self.post_type = event_json['post_type']
            print(event_json)
            if event_json['post_type'] == 'message':
                message = event_json
                self.msg_type = message['message_type']
                self.sender = message['sender']['user_id']
                if self.msg_type == 'group':
                    self.group_id = message['group_id']
                else:
                    self.group_id = 0
                self.msg = message['message']
                if self.sender == MasterID:
                    self.is_master == True
            elif event_json['post_type'] == 'request':
                self.req_type = event_json['request_type']
                self.flag = event_json['flag']
    
        def process_request(self):
            if self.req_type == 'friend':
                post = qbot_api_post_url + "set_friend_add_request"
            elif self.req_type == 'group':
                post = qbot_api_post_url + "set_group_add_request"
    
            print("\n<receive_request>")
            print("receive a new request!")    
            api_payload = {
                'flag': self.flag,
                'approve': True,
                'remark' : 'qbot添加好友'
            }
            res = requests.post(post, data=api_payload)
            print(res)
            print('</receive_request>\n')
            print('\n</process_event>')
    
        def process_message(self):
            print('sender:' + str(self.sender))
            print('send message:' + self.msg)
            print('send group id:' + str(self.group_id))
    
            if (self.msg[0] == '$'):
                msg = self.msg
                if '$wolfram ' in msg:
                    calc = ext_calc(msg)
                    calc.getret()
                    if self.msg_type == 'group':
                        qbot_send_gmsg(self.group_id, shrink(calc.result))
                    elif self.msg_type == 'private':
                        qbot_send_msg(self.sender, shrink(calc.result))
    
                elif '$help' in msg:
                    message = '咱是spinmry计算姬2.0\nWolfram语言解释器\n$wolfram  [表达式]\nWolfram语法帮助:https://reference.wolfram.com/language/\n注:本Bot不支持图片与交互式界面'
                    if self.msg_type == 'group':
                        qbot_send_gmsg(self.group_id, message)
                    elif self.msg_type == 'private':
                        qbot_send_msg(self.sender, message)
    
                else:
                    if self.msg_type == 'group':
                        qbot_send_gmsg(self.group_id, "咱不知道你在说什么呢?")
                    elif self.msg_type == 'private':
                        qbot_send_msg(self.sender, "咱不知道你在说什么呢?")
    
            print('\n</process_event>')
    
    def qbot_listen_process(event):
        event = qbot_event(event)
        if event.post_type == 'request':
            event.process_request()
        elif event.post_type == 'message':
            event.process_message()
    
    def qbot_listen_on_message(event_ws, event):
        th = threading.Thread(target=qbot_listen_process, name="NewEvent", args=(event,))
        th.start()
    
    def qbot_listen_on_error(event_ws, error):
        print(error)
    
    def qbot_listen_on_close(event_ws):
        print("API Websocket closed.")
    
    def main():
        websocket.enableTrace(True)
        event_ws = websocket.WebSocketApp(
            qbot_api_event_url,
            on_message = qbot_listen_on_message,
            on_error = qbot_listen_on_error,
            on_close = qbot_listen_on_close
        )
        event_ws.run_forever()
    
    main()

Comments

polarnova: Orz