V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
ipoh
V2EX  ›  程序员

ChatGPT 配合邮件使用

  •  
  •   ipoh · 2023-03-26 23:51:53 +08:00 · 1776 次点击
    这是一个创建于 615 天前的主题,其中的信息可能已经有所发展或是发生改变。

    注册一个新的 Gmail 用于接收提问,请求 OpenAI 之后回信。 接收方案有很多,可以用云函数、Google App Engine 、VPS 等。

    下面是利用 chatgpt 写的代码

    #coding=utf-8
    
    import json
    import imaplib
    import email
    import smtplib
    from email.mime.text import MIMEText
    from email.header import decode_header
    import time
    import openai
    
    ADMIN = '[email protected]'
    WHITELIST = []
    
    # 邮箱凭据和设置
    mail_username = '[email protected]'
    mail_password = 'password'
    mail_server = 'imap.gmail.com'
    openai.api_key = "sk-APIKEY"  # 你的 OpenAI API Key
    
    TASK = []
    ANSWER = []
    
    def ask_gpt():
        for task in TASK:
            try:
                completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=[{"role": "user", "content": task[2]}])
                print(completion.choices[0].message.content)
                ANSWER.append((task[0], task[1], completion.choices[0].message.content))
            except Exception as ex:
                ANSWER.append((task[0], task[1], str(ex)))
    
    def in_whitelist(From):
        if ADMIN in From:
            return ADMIN
        for item in WHITELIST:
            if item in From:
                return item
        return ""
    
    def get_header(header_text, default='utf8'):
        try:
            headers = decode_header(header_text)
        except:
            print('Error while decoding header, using the header without decoding')
            return header_text
    
        header_sections = []
        #print(headers)
        for text, charset in headers:
            header_section = ""
            if charset:
                try:
                    header_section = text.decode(charset, errors='ignore')
                except:
                    pass
            else:
                header_section = str(text)
            if header_section:
                header_sections.append(header_section)
        return ' '.join(header_sections)
    
    def decodeBody(msgPart):
        """
        解码内容
       :param msgPart: 邮件某部分
        """
        contentType = msgPart.get_content_type()  # 判断邮件内容的类型,text/html
        #print("contentType", contentType)
        textContent = ""
        if contentType == 'text/plain' or contentType == 'text/html':
            content = msgPart.get_payload(decode=True)
            charset = msgPart.get_charset()
            if charset is None:
                contentType = msgPart.get('Content-Type', '').lower()
                position = contentType.find('charset=')
                if position >= 0:
                    charset = contentType[position + 8:].strip()
            if charset:
                textContent = content.decode(charset)
        return textContent
    
    def get_mail():
        # 登录邮箱
        mail = imaplib.IMAP4_SSL(mail_server)
        mail.login(mail_username, mail_password)
        mail.select('inbox')
    
        # 搜索未读邮件
        status, messages = mail.search(None, 'UNSEEN')
    
        for mail_id in messages[0].split():
            _, msg = mail.fetch(mail_id, '(RFC822)')
            email_message = email.message_from_bytes(msg[0][1])
    
            # 处理邮件
            # 例如,打印邮件主题和发件人
            #print(f'Subject: {email_message["Subject"]}')
            #print(f'From: {email_message["From"]}')
            #print(email_message.get_payload(), email_message.is_multipart())
            Subject = get_header(email_message["Subject"])
            From = in_whitelist(email_message["From"])
            if From:
                # 获取邮件内容
                body = ""
                if email_message.is_multipart():
                    messageParts = email_message.get_payload()
                    for messagePart in messageParts:
                        bodyContent = decodeBody(messagePart)
                        if bodyContent:
                            body = bodyContent
                else:
                    bodyContent = decodeBody(email_message)
                    if bodyContent:
                        body = bodyContent
                print(From, Subject, body)
                if '\r\n\r\n\r\n\r\n' in body:
                    body = body.split('\r\n\r\n\r\n\r\n')[0].strip()
                
                if body == "如题" or body == "":
                    body = Subject
                
                if From == ADMIN and Subject.lower() == "add email":
                    if body not in WHITELIST:
                        WHITELIST.append(body)
                        with open("whitelist.txt", "w") as wf:
                            wf.write(json.dumps(WHITELIST))
                elif From == ADMIN and Subject.lower() == "remove email":
                    if body in WHITELIST:
                        WHITELIST.remove(body)
                        with open("whitelist.txt", "w") as wf:
                            wf.write(json.dumps(WHITELIST))
                else:
                    TASK.append((From, Subject, body))
                
            # 将邮件标记为已读
            mail.store(mail_id, '+FLAGS', '\Seen')
    
        # 退出邮箱
        mail.close()
        mail.logout()
    
    def send_mail():
        for item in ANSWER:
            # 邮件内容
            msg = MIMEText(item[2])
    
            # 发件人和收件人
            msg['Subject'] = '回复:' + item[1]
            msg['From'] = mail_username
            msg['To'] = item[0]
    
            # 发送邮件
            s = smtplib.SMTP('smtp.gmail.com', 587)
            s.starttls()
            s.login(mail_username, mail_password)
            s.sendmail(mail_username, [item[0]], msg.as_string())
            s.quit()
    
    if __name__ == "__main__":
        try:
            with open("whitelist.txt", "r") as f:
                WHITELIST = json.loads(f.read())
        except Exception as ex:
            print(str(ex))
        print(WHITELIST)
        while True:
            try:
                get_mail()
                if TASK:
                    print(TASK)
                    ask_gpt()
                    send_mail()
            except Exception as ex:
                print(str(ex))
            TASK = []
            ANSWER = []
            time.sleep(60)
    
    brrruski
        1
    brrruski  
       2023-03-27 02:12:08 +08:00 via iPhone
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2717 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 09:34 · PVG 17:34 · LAX 01:34 · JFK 04:34
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.