效果如下:在这里获取讯飞星火接口信息,填入Sparkapi.py文末
def config(): config = {} config["appid"] = "xxx" config["api_secret"] = "xxx" config["api_key"] = "xxx" config["domain"] = "generalv2" config["Spark_url"] = "ws://spark-api.xf-yun.com/v2.1/chat" return config
此处作者提供了自己的api供大家学习,用的是讯飞v2.1版本,可根据官方文档自行更改。
创建一个flask应用,将三个文件复制进去,index文件放入template中,运行ai.py
注:要下两个库,flask 和 websocket_client(不是websocket)
代码如下:
Sparkapi.py
import _thread as threadimport base64import datetimeimport hashlibimport hmacimport jsonfrom urllib.parse import urlparseimport sslfrom datetime import datetimefrom time import mktimefrom urllib.parse import urlencodefrom wsgiref.handlers import format_date_timeimport websocket # 使用websocket_clientanswer = ""class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, Spark_url): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.host = urlparse(Spark_url).netloc self.path = urlparse(Spark_url).path self.Spark_url = Spark_url # 生成url def create_url(self): # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + self.host + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + self.path + " HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"' authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": self.host } # 拼接鉴权参数,生成url url = self.Spark_url + '?' + urlencode(v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 return url# 收到websocket错误的处理def on_error(ws, error): print("### error:", error)# 收到websocket关闭的处理def on_close(ws, one, two): print(" ")# 收到websocket连接建立的处理def on_open(ws): thread.start_new_thread(run, (ws,))def run(ws, *args): data = json.dumps(gen_params(appid=ws.appid, domain=ws.domain, question=ws.question)) ws.send(data)# 收到websocket消息的处理def on_message(ws, message): # print(message) data = json.loads(message) code = data['header']['code'] if code != 0: print(f'请求错误: {code}, {data}') ws.close() else: choices = data["payload"]["choices"] status = choices["status"] content = choices["text"][0]["content"] print(content, end="") global answer answer += content # print(1) if status == 2: ws.close()def gen_params(appid, domain, question): """ 通过appid和用户的提问来生成请参数 """ data = { "header": { "app_id": appid, "uid": "1234" }, "parameter": { "chat": { "domain": domain, "temperature": 0.5, "max_tokens": 2048 } }, "payload": { "message": { "text": question } } } return datadef main(appid, api_key, api_secret, Spark_url, domain, question): # print("星火:") wsParam = Ws_Param(appid, api_key, api_secret, Spark_url) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open) ws.appid = appid ws.question = question ws.domain = domain ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})# length = 0def getText(role, content): text = [] jsoncon = {} jsoncon["role"] = role jsoncon["content"] = content text.append(jsoncon) return textdef getlength(text): length = 0 for content in text: temp = content["content"] leng = len(temp) length += leng return lengthdef checklen(text): while (getlength(text) > 8000): del text[0] return text# # 用于配置大模型版本,默认“general/generalv2”# # domain = "general" # v1.5版本# domain = "generalv2" # v2.0版本# # 云端环境的服务地址# # Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址def config(): config = {} config["appid"] = "cd8f9610" config["api_secret"] = "YTEzMmFkYzY3YzY1NGI3MjZmZDkwNzBm" config["api_key"] = "24138b88ad0a72974fb99d37fb7951e1" config["domain"] = "generalv2" config["Spark_url"] = "ws://spark-api.xf-yun.com/v2.1/chat" return config
Ai.py
import osfrom flask import Flask, render_template, request, send_from_directoryimport timeimport SparkApiapp=Flask(__name__)@app.route("/", methods=["GET", "POST"])def xin_info(): if request.method == "GET": return render_template('index.html') elif request.method == "POST": try: text = [] text.clear appid = SparkApi.config()["appid"] api_secret = SparkApi.config()["api_secret"] api_key = SparkApi.config()["api_key"] domain = SparkApi.config()["domain"] Spark_url = SparkApi.config()["Spark_url"] req_dict = request.get_json() # 获取前端数据 Input = req_dict.get('input') # 获取前端数据json里面的ts关键字数据 question = SparkApi.checklen(SparkApi.getText("user", Input)) SparkApi.answer = "" print("星火:", end="") SparkApi.main(appid, api_key, api_secret, Spark_url, domain, question) answer_txt = SparkApi.getText("assistant", SparkApi.answer)[0]["content"] print(answer_txt) print_text = {"msg": answer_txt} return print_text except Exception as e: print_text = {"msg": str(e)} return print_textif __name__ == '__main__': app.run()
index.html
<!DOCTYPE html><html><head> <style> body { display: flex; justify-content: center; flex-direction: column; align-items: center; height: 100vh; } .chat-container { width: 400px; height: 500px; overflow-y: auto; padding: 20px; border: 1px solid #ccc; border-radius: 10px; background-color: #fff; box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); } .message-container { display: flex; flex-direction: row; margin-bottom: 10px; } .user-message { margin-left: auto; /* 调整为右边 */ background-color: #4CAF50; color: white; padding: 10px; border-radius: 5px; } .bot-message { margin-right: auto; /* 调整为左边 */ background-color: #ccc; color: black; padding: 10px; border-radius: 5px; } .input-container { margin-top: 10px; display: flex; align-items: center; } .input-container input[type="text"] { flex: 1; width: 350px; padding: 5px; border: 1px solid #ccc; border-radius: 5px; } .input-container button { margin-left: 10px; width: 50px; padding: 5px 10px; background-color: #4CAF50; color: white; border: none; border-radius: 5px; cursor: pointer; } </style></head><body> <div class="chat-container" id="chat-window"></div> <form id="myForm" class="input-container"> <input type="text" id="input" name="input"> <button type="submit">发送</button> </form> <script src="https://code.jquery.com/jquery-3.5.1.min.js"></script> <script> $(document).ready(function() { // 监听表单提交事件 $('#myForm').submit(function(event) { event.preventDefault(); // 阻止默认表单提交行为 var input = $('#input').val(); // 获取输入框的值 // 创建新的消息元素 var message = $('<div class="message-container"></div>'); var userMessage = $('<div class="user-message"></div>').text(input); // 修改了变量名和类名 // 将消息添加到聊天窗口 message.append(userMessage); $('#chat-window').append(message); // AJAX POST请求到后端接口 $.ajax({ url: '/', type: 'post', contentType: 'application/json', data: JSON.stringify({ "input": input }), // 将输入数据转为JSON格式 success: function(response) { // 创建新的消息元素 var message = $('<div class="message-container"></div>'); var botMessage = $('<div class="bot-message"></div>').text(response.msg); // 修改了变量名和类名 // 将消息添加到聊天窗口 message.append(botMessage); $('#chat-window').append(message); // 滚动到最底部以显示最新消息 $('#chat-window').scrollTop($('#chat-window')[0].scrollHeight); }, error: function(xhr, status, error) { console.log(error); } }); // 清空输入框 $('#input').val(''); }); }); </script></body></html>
前端代码JS部分注释:
$(document).ready(function() {
:当文档(页面)加载完毕后,执行里面的代码,确保所有元素都已加载完成。
$('#myForm').submit(function(event) {
:监听表单提交事件,当表单(id为myForm)被提交时执行回调函数。
event.preventDefault();
:阻止表单的默认提交行为,即不让表单真正提交到服务器。
var input = $('#input').val();
:获取输入框(id为input)中的值。创建用户消息元素:
var message = $('<div class="message-container"></div>');
:创建一个消息容器元素。var userMessage = $('<div class="user-message"></div>').text(input);
:创建用户消息元素,并设置文本内容为用户输入的值。将用户消息添加到聊天窗口:
message.append(userMessage);
:将用户消息添加到消息容器中。$('#chat-window').append(message);
:将消息容器添加到聊天窗口中,显示用户输入的消息。发起 AJAX POST 请求到后端接口:
$.ajax({
:使用 jQuery 的 AJAX 函数发起请求。url: '/',
:请求的目标 URL。type: 'post',
:请求类型为 POST。contentType: 'application/json',
:发送的数据类型为 JSON 格式。data: JSON.stringify({ "input": input }),
:将用户输入的值转换为 JSON 格式发送给后端。success: function(response) {
:请求成功后的回调函数。在成功回调函数内部: 创建机器人消息元素。将机器人消息添加到聊天窗口。滚动到最底部以显示最新消息。
error: function(xhr, status, error) { console.log(error); }
:如果请求出错,则在控制台输出错误信息。
$('#input').val('');
:清空输入框,为下一次输入做准备。