如何在Python中使用ChatGPT API进行实时数据处理

Python中使用ChatGPT API实时数据处理

OpenAI的GPT已经成为全球最主要的AI工具,并且在根据其训练数据回答查询方面表现出色。然而,它无法回答关于未知主题的问题:

  • 2021年9月之后的最新事件
  • 您的非公开文件
  • 过去的对话中的信息

当您处理经常变化的实时数据时,这个任务变得更加复杂。此外,您不能将大量内容提供给GPT,它也无法在长时间内保留您的数据。在这种情况下,您需要高效地构建一个自定义的LLM(语言学习模型)应用程序,以为答案过程提供上下文。本文将引导您使用Python中的开源LLM应用程序库开发此类应用程序的步骤。源代码位于GitHub上(在“构建用于销售的ChatGPT Python API”一节中提供链接)。

学习目标

本文将介绍以下内容:

  • 为什么您需要向ChatGPT添加自定义数据
  • 如何利用嵌入、提示工程和ChatGPT来进行更好的问答
  • 使用LLM应用程序构建自己的ChatGPT,包括自定义数据
  • 创建用于查找实时折扣或销售价格的ChatGPT Python API

为什么要为ChatGPT提供自定义知识库?

在介绍增强ChatGPT的方法之前,让我们先了解一下手动方法并确定它们的挑战。通常,通过提示工程来扩展ChatGPT。假设您想从各种在线市场查找实时折扣/优惠券。

例如,当您向ChatGPT询问“能否为我找到本周阿迪达斯男鞋的折扣?”时,您可以从ChatGPT UI界面得到的标准回答是:

显然,GPT提供了关于如何找到折扣的一般建议,但在具体的折扣位置或类型等细节方面缺乏特定性。为了帮助模型,我们从可信的数据源补充了折扣信息。在发布实际问题之前,您必须通过添加初始文档内容与ChatGPT进行交互。我们将从亚马逊产品交易数据集中收集这些示例数据,并仅插入一个JSON项作为提示:

如您所见,您得到了预期的输出,并且这很容易实现,因为ChatGPT现在具有上下文意识。然而,这种方法的问题在于模型的上下文受限(GPT-4的最大文本长度为8,192个标记)。当输入数据庞大时,您可能会遇到问题,您可能期望在销售中发现数千个商品,但无法将这么多的数据作为输入消息提供。此外,一旦您收集了数据,您可能希望对数据进行清洗、格式化和预处理,以确保数据的质量和相关性。

如果您使用OpenAI的Chat Completion端点或构建ChatGPT的自定义插件,将引入以下其他问题:

  • 成本 – 通过提供更详细的信息和示例,模型的性能可能会提高,但成本也会增加(对于输入为10k标记、输出为200标记的GPT-4,每次预测的成本为0.624美元)。重复发送相同的请求可能会增加成本,除非使用本地缓存系统。
  • 延迟 – 使用OpenAI等提供ChatGPT API的生产环境时存在的一个挑战是其不可预测性。无法保证提供一致的服务。
  • 安全性 – 当集成自定义插件时,必须在OpenAPI规范中指定每个API端点的功能。这意味着您将向ChatGPT透露您的内部API设置,这是许多企业对其持怀疑态度的风险。
  • 离线评估 – 对代码和数据输出进行离线测试或在本地复制数据流对开发人员来说是具有挑战性的。这是因为对系统的每个请求可能会产生不同的响应。

使用嵌入、提示工程和ChatGPT进行问答

您在互联网上找到的一个有前景的方法是利用LLM来创建嵌入,然后使用这些嵌入构建您的应用程序,例如搜索和问答系统。换句话说,您不会使用Chat Completion端点查询ChatGPT,而是执行以下查询:

给定以下折扣数据:{input_data},回答以下查询:{user_query}

概念很简单。方法首先通过OpenAI API为每个输入文档(文本、图像、CSV、PDF或其他类型的数据)创建向量嵌入,然后将生成的嵌入索引到用于快速检索的向量数据库中,并使用用户的问题从向量数据库中搜索并获取相关文档。然后将这些文档与问题一起作为提示呈现给ChatGPT。有了这个额外的上下文,ChatGPT可以像在内部数据集上进行训练一样进行响应。

另一方面,如果您使用Pathway的LLM应用程序,则甚至不需要任何向量数据库。它直接从任何兼容存储中实时读取数据,而无需查询带有增加的准备工作、基础设施和复杂性等成本的向量文档数据库。保持源数据和向量同步非常困难。而且,如果下划线的输入数据随着时间的推移而发生变化并需要重新索引,这更加困难。

使用LLM应用程序使用自定义数据的ChatGPT

以下简单的步骤解释了使用LLM应用程序为您的数据构建ChatGPT应用程序的数据流水线方法。

1. 收集:您的应用程序以流媒体模式启用Pathway时,实时从各种数据源(CSV、JSON Lines、SQL数据库、Kafka、Redpanda、Debezium等等)读取数据。它还将每个数据行映射到结构化文档模式,以更好地管理大型数据集。
2. 预处理:可选地,您可以通过删除重复项、不相关信息和噪声数据来轻松清理数据,这可能会影响到您的响应质量,并提取您需要用于进一步处理的数据字段。此外,在此阶段,您可以掩盖或隐藏隐私数据,以避免将其发送给ChatGPT。
3. 嵌入:使用OpenAI API对每个文档进行嵌入,并检索嵌入结果。
4. 索引:实时在生成的嵌入上构建索引。
5. 搜索:假设从API友好的界面接收用户问题,使用嵌入生成查询的向量。使用嵌入,根据与查询的相关性检索向量索引。
6. 提问:将问题和最相关的部分插入到发送给GPT的消息中。返回GPT的答案(聊天完成终点)。

为销售构建ChatGPT Python API

一旦我们清楚了LLM应用程序在前一节中的工作流程,您可以按照以下步骤了解如何构建折扣查找应用程序。项目源代码可以在GitHub上找到。如果您想快速开始使用该应用程序,可以跳过此部分,克隆存储库,并按照README.md文件中的说明运行代码示例。

示例项目目标

受到企业搜索的这篇文章的启发,我们的示例应用程序应该在Python中公开一个HTTP REST API端点,以通过从各种来源(CSV、Jsonlines、API、消息代理或数据库)检索最新交易来回答用户关于当前销售的查询,并利用OpenAI API嵌入和聊天完成端点生成AI助手的响应。

第一步:数据收集(自定义数据摄取)

为了简单起见,我们可以使用任何JSON Lines作为数据源。该应用程序使用类似discounts.jsonl的JSON Lines文件作为处理用户查询时的数据。数据源希望每一行都有一个doc对象。确保您首先将输入数据转换为Jsonlines格式。以下是带有单个原始行的Jsonline文件的示例:

酷的部分是,该应用程序始终知道数据文件夹中的更改。如果您添加另一个JSON Lines文件,LLM应用程序会自动更新AI模型的响应。

步骤2:数据加载和映射

使用Pathway的JSON Lines输入连接器,我们将读取本地JSONlines文件,将数据条目映射到模式,并创建Pathway表。在app.py中查看完整的源代码:

...
sales_data = pw.io.jsonlines.read(
    "./examples/data",
    schema=DataInputSchema,
    mode="streaming"
)

将每个数据行映射到结构化文档模式。在app.py中查看完整的源代码:

class DataInputSchema(pw.Schema):
    doc: str

步骤3:数据嵌入

每个文档都使用OpenAI API进行嵌入并检索嵌入结果。在embedder.py中查看完整的源代码:

...
embedded_data = embeddings(context=sales_data, data_to_embed=sales_data.doc)

步骤4:数据索引

然后我们在生成的嵌入上构建即时索引:

index = index_embeddings(embedded_data)

步骤5:用户查询处理和索引

我们创建一个REST端点,从API请求有效负载中获取用户查询,并使用OpenAI API嵌入用户查询。

...
query, response_writer = pw.io.http.rest_connector(
    host=host,
    port=port,
    schema=QueryInputSchema,
    autocommit_duration_ms=50,
)

embedded_query = embeddings(context=query, data_to_embed=pw.this.query)

步骤6:相似度搜索和提示工程

我们使用索引进行相似度搜索,以识别与查询嵌入最相关的匹配项。然后我们构建一个提示,将用户的查询与获取的相关数据结果合并,并发送消息到ChatGPT完成端点以生成适当和详细的响应。

responses = prompt(index, embedded_query, pw.this.query)

当我们构建提示并在prompt.py中向ChatGPT添加内部知识时,我们遵循了相同的上下文学习方法。

prompt = f"给定以下折扣数据:\\n {docs_str} \\n回答此查询:{query}"

步骤7:返回响应

最后一步就是将API的响应返回给用户。

# 使用索引数据构建提示
responses = prompt(index, embedded_query, pw.this.query)

步骤9:将所有内容放在一起

现在,如果我们将所有上述步骤放在一起,您将拥有用于自定义折扣数据的LLM启用的Python API,就像在app.py Python脚本中看到的实现一样。

import pathway as pw

from common.embedder import embeddings, index_embeddings
from common.prompt import prompt


def run(host, port):
    # 作为查询从API获取用户问题
    query, response_writer = pw.io.http.rest_connector(
        host=host,
        port=port,
        schema=QueryInputSchema,
        autocommit_duration_ms=50,
    )

    # 实时数据来自外部数据源,如jsonlines文件
    sales_data = pw.io.jsonlines.read(
        "./examples/data",
        schema=DataInputSchema,
        mode="streaming"
    )

    # 使用OpenAI嵌入API为每个文档计算嵌入
    embedded_data = embeddings(context=sales_data, data_to_embed=sales_data.doc)

    # 实时在生成的嵌入上构建索引
    index = index_embeddings(embedded_data)

    # 使用OpenAI嵌入API为查询生成嵌入
    embedded_query = embeddings(context=query, data_to_embed=pw.this.query)

    # 使用索引数据构建提示
    responses = prompt(index, embedded_query, pw.this.query)

    # 将提示提供给ChatGPT并获取生成的答案
    response_writer(responses)

    # 运行流水线
    pw.run()


class DataInputSchema(pw.Schema):
    doc: str


class QueryInputSchema(pw.Schema):
    query: str

(可选) 第10步:添加交互式用户界面

为了使您的应用更加交互式和用户友好,您可以使用Streamlit来构建一个前端应用。请查看此app.py文件中的实现。

运行应用

按照README.md文件(前面链接的文件)中的“如何运行项目”部分的指示,您可以开始询问有关折扣的问题,API将根据您添加的折扣数据源进行响应。

当我们使用UI(应用数据源)向GPT提供此知识后,看它如何回复:

该应用同时考虑Rainforest API和discounts.csv文件文档(即时合并这些源中的数据),实时索引它们,并在处理查询时使用这些数据。

进一步改进

通过向ChatGPT添加领域特定的知识(如折扣),我们只发现了LLM应用的一些功能。您还可以实现更多的事情:

  • 与来自外部API的其他数据一起,包括各种文件(如Jsonlines、PDF、Doc、HTML或文本格式),像PostgreSQL或MySQL这样的数据库,以及来自Kafka、Redpanda或Debedizum等平台的流数据。
  • 保持数据快照以观察销售价格随时间的变化,因为Pathway提供了一个内置功能来计算两个更改之间的差异。
  • 除了通过API访问数据之外,LLM应用还允许您将处理过的数据传递给其他下游连接器,如BI和分析工具。例如,设置它在检测到价格变动时接收警报。