用生成AI构建实时Slackbot

如何利用生成AI构建实时Slackbot

在这篇文章中,我将向您展示如何使用由Apache NiFi提供支持的Cloudera DataFlow实时与IBM WatsonX.AI基础大型语言模型进行交互。我们可以与任何基础模型一起工作,如Google FLAN T5 XXL或IBM Granite模型。

我将向您展示构建实时数据流水线的简易方法,将问题直接提供给在IBM Cloud上运行的安全WatsonX.AI模型,用于供应用程序如Slack和移动应用程序实时使用。我们将使用Cloudera Data Flow来处理所有的安全性、管理、谱系和治理。作为决策的一部分,我们可以根据提示的类型动态选择不同的WatsonX.AI模型。例如,如果我们想继续一个句子而不是回答问题,我可以选择不同的模型。对于回答问题,Google FLAN T5 XXL效果很好。如果我想继续句子,我会使用IBM Granite模型之一。

您会注意到WatsonX.AI模型返回我们所需结果的速度之快。我会进行一些快速的丰富和转换,然后将它们发送到Cloudera Apache Kafka,用于连续的分析和分发给许多其他应用程序、系统、平台和下游消费者。我们也会将答案输出给最初的请求者,这可以是Slack频道中的某个人或应用程序中的某个人。所有这些都是实时发生的,无需编码,完全具备治理、谱系、数据管理和安全性,并且可以实现任何规模和任何平台。

IBM和Cloudera在私有、公共和混合云环境中结合在一起,用于实时数据和人工智能,这只是一个开始。立即尝试吧。

逐步实时流程

首先,在Slack中,我输入一个问题:

“Q:集成生成式人工智能和Apache NiFi的好方法是什么?”

NiFi流程顶部

一旦输入了该问题,Slack服务器就会将这些事件发送到我们的注册服务。这可以在任何面向公众的地方托管。

  • (点击此处获取Slack API链接)

Slack API

启用后,您的服务器将开始接收每个Slack帖子的JSON事件。这在NiFi中接收和解析非常容易。即使在设计模式下,Cloudera DataFlow在公有云托管版中也能轻松接收安全的HTTPS REST调用。

NiFi流程顶部2

在流程的第一部分,我们收到了REST JSON帖子,内容如下所示。

Slackbot 1.0 (+https://api.slack.com/robots)application/jsonPOSTHTTP/1.1{  "token" : "qHvJe59yetAp1bao6wmQzH0C",  "team_id" : "T1SD6MZMF",  "context_team_id" : "T1SD6MZMF",  "context_enterprise_id" : null,  "api_app_id" : "A04U64MN9HS",  "event" : {    "type" : "message",    "subtype" : "bot_message",    "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20

这是一个非常丰富详细的JSON文件,我们可以将其立即作为原始数据推送到Apache Iceberg Open Cloud Lakehouse、Kafka主题或对象存储中作为JSON文档(增强选项)。我只是要解析我需要的部分。

EvaluateJSONPath

我们解析出频道ID和帖子的纯文本。我只想要来自general(“C1SD6N197”)的消息。然后,我将文本复制到一个输入字段中,这是 Hugging Face所需的。

我们检查输入:如果是股票或天气(还会有更多内容),我们避免调用LLM。

SELECT * FROM FLOWFILEWHERE upper(inputs) like '%WEATHER%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE upper(inputs) like '%STOCK%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE (upper(inputs) like 'QUESTION:%'OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'and not upper(inputs) like '%STOCK%'

对于股票处理:

为了解析我们需要的股票,我使用了我的Open NLP处理器。

所以你需要下载处理器和实体提取模型。

然后我们将公司名称传递给AlphaVantage的HTTP REST端点,将公司名称转换为股票符号。在免费账户中,每天只有几次调用,所以如果失败了,我们就会绕过这一步,尝试使用您传递的任何内容。

使用RouteOnContent过滤错误消息。

然后我们使用QueryRecord处理器将CSV转换为JSON并进行过滤。

SELECT name as companyName, symbol  FROM FLOWFILEORDER BY matchScore DESCLIMIT 1

我们使用SplitRecord确保只有一条记录。然后我们运行EvaluateJsonPath将字段作为属性获取。

UpdateAttribute中,我们修剪股票符号,以防万一。

${stockSymbol:trim()}

然后我们将该股票符号传递给Twelve Data通过InvokeHTTP获取我们的股票数据。

然后我们得到了很多股票数据。

{  "meta" : {    "symbol" : "IBM",    "interval" : "1min",    "currency" : "USD",    "exchange_timezone" : "America/New_York",    "exchange" : "NYSE",    "mic_code" : "XNYS",    "type" : "Common Stock"  },  "values" : [ {    "datetime" : "2023-11-15 10:37:00",    "open" : "152.07001",    "high" : "152.08000",    "low" : "151.99500",    "close" : "152.00999",    "volume" : "8525"  }, {    "datetime" : "2023-11-15 10:36:00",    "open" : "152.08501",    "high" : "152.12250",    "low" : "152.08000",    "close" : "152.08501",    "volume" : "15204"  } ...

然后我们运行EvaluateJSONPath抓取交易所信息。

我们分叉记录以只获得一条记录,这只是返回给Slack。我们使用UpdateRecord调用来丰富股票数据和其他值。然后我们运行QueryRecord将我们限制在1条记录以发送到Slack。

SELECT * FROM FLOWFILEORDER BY 'datetime' DESCLIMIT 1

我们运行一个EvaluateJsonPath来获取最有价值的字段进行显示。

然后我们运行一个PutSlack,带上我们的消息。

LLM 跳过。${companyName} [${nlp_org_1}/${stockSymbol}] 的股票价值在${date}为${closeStockValue}。股票日期${stockdateTime}。股票交易所${exchange}

我们还有一个与公司名称分开的单独流程。

在第一步中,我们调用Yahoo Finance获取该股票的RSS头条新闻。

https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}&region=US&lang=en-US

我们使用QueryRecord将RSS/XML记录转换为JSON。

然后我们运行SplitJSON来拆分新闻项。

我们运行SplitRecord以限制为一条记录。我们使用EvaluateJSONPath获取我们的Slack消息所需的字段。

然后我们运行UpdateRecord以完成我们的JSON。

然后我们将此消息发送到Slack。

LLM 跳过。${companyName} [${nlp_org_1}/${stockSymbol}] 的股票新闻信息在${date}${title} :${description}。${guid}文章日期${pubdate}

对于那些选择天气的人,我们会遵循类似的路线(我们应该在Aiven中使用Redis进行缓存),与股票相似。我们使用我的OpenNLP处理器来提取您可能想知道天气的地点。

下一步是使用处理器的输出构建要发送给我们的地理编码器的值。

weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "纽约市")}

如果我们找不到有效的位置,我会说“纽约市”。我们可以使用其他一些查询。我正在加载所有位置的工作,可以在其中进行一些高级的PostgreSQL搜索,或者可能是OpenSearch或矢量化数据存储。

我把那个位置传给Open Meteo,通过InvokeHTTP找到地理位置。

https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json

然后,我们从结果中解析我们需要的值。

{ "results" : [ { "id" : 5128581, "name" : "纽约", "latitude" : 40.71427, "longitude" : -74.00597, "elevation" : 10.0, "feature_code" : "PPL", "country_code" : "US", "admin1_id" : 5128638, "timezone" : "America/New_York", "population" : 8175133, "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ], "country_id" : 6252001, "country" : "美国", "admin1" : "纽约" } ], "generationtime_ms" : 0.92196465}

然后我们解析结果,以便通过InvokeHTTP调用另一个API获取该纬度和经度的当前天气。

https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}

结果是地理JSON。

{    "@context": [        "https://geojson.org/geojson-ld/geojson-context.jsonld",        {            "@version": "1.1",            "wx": "https://api.weather.gov/ontology#",            "s": "https://schema.org/",            "geo": "http://www.opengis.net/ont/geosparql#",            "unit": "http://codes.wmo.int/common/unit/",            "@vocab": "https://api.weather.gov/ontology#",            "geometry": {                "@id": "s:GeoCoordinates",                "@type": "geo:wktLiteral"            },            "city": "s:addressLocality",            "state": "s:addressRegion",            "distance": {                "@id": "s:Distance",                "@type": "s:QuantitativeValue"            },            "bearing": {                "@type": "s:QuantitativeValue"            },            "value": {                "@id": "s:value"            },            "unitCode": {                "@id": "s:unitCode",                "@type": "@id"            },            "forecastOffice": {                "@type": "@id"            },            "forecastGridData": {                "@type": "@id"            },            "publicZone": {                "@type": "@id"            },            "county": {                "@type": "@id"            }        }    ],    "id": "https://api.weather.gov/points/40.7143,-74.006",    "type": "Feature",    "geometry": {        "type": "Point",        "coordinates": [            -74.006,            40.714300000000001        ]    },    "properties": {        "@id": "https://api.weather.gov/points/40.7143,-74.006",        "@type": "wx:Point",        "cwa": "OKX",        "forecastOffice": "https://api.weather.gov/offices/OKX",        "gridId": "OKX",        "gridX": 33,        "gridY": 35,        "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast",        "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly",        "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35",        "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations",        "relativeLocation": {            "type": "Feature",            "geometry": {                "type": "Point",                "coordinates": [                    -74.0279259,                    40.745251000000003                ]            },            "properties": {                "city": "Hoboken",                "state": "NJ",                "distance": {                    "unitCode": "wmoUnit:m",                    "value": 3906.1522008034999                },                "bearing": {                    "unitCode": "wmoUnit:degree_(angle)",                    "value": 151                }            }        },        "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072",        "county": "https://api.weather.gov/zones/county/NYC061",        "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212",        "timeZone": "America/New_York",        "radarStation": "KDIX"    }}

我们使用EvaluateJSONPath获取天气预报URL。

然后通过invokeHTTP调用该天气预报URL。

这会产生一个更大的JSON输出,我们将解析该输出以获取要返回给Slack的结果。

我们使用EvaluateJSONPath解析数据来获取天气的主要字段。

然后将这些字段格式化为PutSlack

LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude}Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}

Slack 输出


如果我们有一个 LLM 问题,让我们确保只记录一条。

我们使用几个不同的模型,这些模型可以在 IBM 云上的 IBM WatsonX.AI 上快速访问我们的 REST 提示。

我最初在 IBM 的 Prompt Lab 中测试和构建了这些提示,然后从那里复制了最初的 curl 语句。

点击这里查看 与 IBM watsonx.ai 可用的基础模型支持

ibm/mpt-7b-instruct2meta-llama/llama-2–70b-chatibm/granite-13b-chat-v1

我们需要将我们独特的安全密钥发送给 IBM,他们会给我们一个令牌来在下一步通话中使用。

我们解析问题然后通过 REST API 将其发送给 WatsonX。

我们构建的提示如下。

{  "model_id": "meta-llama/llama-2-70b-chat",  "input": "${inputs:urlEncode()}",  "parameters": {    "decoding_method": "greedy",    "max_new_tokens": 200,    "min_new_tokens": 50,    "stop_sequences": [],    "repetition_penalty": 1  },  "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }

我们解析生成的文本,其中包括生成的 AI 结果以及一些有关时间的有用元数据。

Slack 发布的结果如下:

“您可以使用 Apache NiFi 以多种方式将生成的 AI 模型集成到其中:

  1. 数据预处理:使用 NiFi 在将其输入到生成的 AI 模型之前对数据进行预处理。这可以包括数据清理、转换和特征工程。
  2. 模型训练:使用 NiFi 自动化生成的 AI 模型的训练过程。您可以使用 NiFi 的 PutFile 和 PutFile_SFTP 处理器将训练数据写入文件,然后使用类似 ExecuteScript 的处理器运行训练脚本。
  3. 模型部署:生成 AI 模型训练完成后,您可以使用 NiFi 进行部署。您可以创建一个 NiFi 流来接收输入数据,将其通过生成的 AI 模型进行处理,然后输出生成的数据。
  4. 实时推理:您可以使用 NiFi 的 StreamingJobs”

Slackbot 发布结果后,它还会将度量和调试信息发布到chat 频道。

所有元数据都会发布到另一个 Slack 频道供管理员监控。

==== NiFi 到 IBM WatsonX.AI LLM AnswersOn 日期:2023 年 11 月 15 日 15:43:29 GMT 创建时间:2023 年 11 月 15 日 15:43:29.248Z 提示:Q:如何提高生成的 AI 和 Apache NiFi 的集成?回答:您可以使用 Apache NiFi 以多种方式将生成的 AI 模型集成到其中:1. 数据预处理:使用 NiFi 在将其输入到生成的 AI 模型之前对数据进行预处理。这可以包括数据清理、转换和特征工程。2. 模型训练:使用 NiFi 自动化生成的 AI 模型的训练过程。您可以使用 NiFi 的 PutFile 和 PutFile_SFTP 处理器将训练数据写入文件,然后使用类似 ExecuteScript 的处理器运行训练脚本。3. 模型部署:生成 AI 模型训练完成后,您可以使用 NiFi 进行部署。您可以创建一个 NiFi 流来接收输入数据,将其通过生成的 AI 模型进行处理,然后输出生成的数据。4. 实时推理:您可以使用 NiFi 的 StreamingJobsToken:200Req 时长:8153HTTP TX ID:89d71099-da23-4e7e-89f9-4e8f5620c0fbIBM 消息:此模型是受第三方许可证管理的非 IBM 产品,可能会施加使用限制和其他义务。使用此模型即表示您同意其条款,详细信息请参阅以下 URL。 URL:https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wxIBM 消息 ID:disclaimer_warningModel ID:meta-llama/llama-2-70b-chatStop 原因:max_tokensToken 计数:38TX ID:NGp0djg-c05f740f84f84b7c80f93f9da05aa756UUID:da0806cb-6133-4bf4-808e-1fbf419c09e3Corr ID:NGp0djg-c05f740f84f84b7c80f93f9da05aa756Global TX ID:20c3a9cf276c38bcdaf26e3c27d0479bService 时间:478Request ID:03c2726a-dcb6-407f-96f1-f83f20fe9c9c文件名:1a3c4386-86d2-4969-805b-37649c16addbRequest 时长:8153Request URL:https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29cf-ray :82689bfd28e48ce2-EWR=====

 

制作您自己的Slackbot

Slack输出

Kafka分发

CREATE TABLE `ssb`.`Meetups`.`watsonairesults` (`date` VARCHAR(2147483647),  `x_global_transaction_id` VARCHAR(2147483647),  `x_request_id` VARCHAR(2147483647),  `cf_ray` VARCHAR(2147483647),  `inputs` VARCHAR(2147483647),  `created_at` VARCHAR(2147483647),  `stop_reason` VARCHAR(2147483647),  `x_correlation_id` VARCHAR(2147483647),  `x_proxy_upstream_service_time` VARCHAR(2147483647),  `message_id` VARCHAR(2147483647),  `model_id` VARCHAR(2147483647),  `invokehttp_request_duration` VARCHAR(2147483647),  `message` VARCHAR(2147483647),  `uuid` VARCHAR(2147483647),  `generated_text` VARCHAR(2147483647),  `transaction_id` VARCHAR(2147483647),  `tokencount` VARCHAR(2147483647),  `generated_token` VARCHAR(2147483647),  `ts` VARCHAR(2147483647),  `advisoryId` VARCHAR(2147483647),  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH ('deserialization.failure.policy' = 'ignore_and_log',  'properties.request.timeout.ms' = '120000',  'format' = 'json',  'properties.bootstrap.servers' = 'kafka:9092',  'connector' = 'kafka',  'properties.transaction.timeout.ms' = '900000',  'topic' = 'watsonxaillmanswers',  'scan.startup.mode' = 'group-offsets',  'properties.auto.offset.reset' = 'earliest',  'properties.group.id' = 'watsonxaillmconsumer')CREATE TABLE `ssb`.`Meetups`.`watsonxresults` (`date` VARCHAR(2147483647),  `x_global_transaction_id` VARCHAR(2147483647),  `x_request_id` VARCHAR(2147483647),  `cf_ray` VARCHAR(2147483647),  `inputs` VARCHAR(2147483647),  `created_at` VARCHAR(2147483647),  `stop_reason` VARCHAR(2147483647),  `x_correlation_id` VARCHAR(2147483647),  `x_proxy_upstream_service_time` VARCHAR(2147483647),  `message_id` VARCHAR(2147483647),  `model_id` VARCHAR(2147483647),  `invokehttp_request_duration` VARCHAR(2147483647),  `message` VARCHAR(2147483647),  `uuid` VARCHAR(2147483647),  `generated_text` VARCHAR(2147483647),  `transaction_id` VARCHAR(2147483647),  `tokencount` VARCHAR(2147483647),  `generated_token` VARCHAR(2147483647),  `ts` VARCHAR(2147483647),  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH ('deserialization.failure.policy' = 'ignore_and_log',  'properties.request.timeout.ms' = '120000',  'format' = 'json',  'properties.bootstrap.servers' = 'kafka:9092',  'connector' = 'kafka',  'properties.transaction.timeout.ms' = '900000',  'topic' = 'watsonxaillm',  'scan.startup.mode' = 'group-offsets',  'properties.auto.offset.reset' = 'earliest',  'properties.group.id' = 'allwatsonx1')

示例提示

{"inputs":"请回答以下问题。美国的首都是什么?"}

IBM DB2 SQL

alter table  "DB2INST1"."TRAVELADVISORY"add column "summary" VARCHAR(2048);-- DB2INST1.TRAVELADVISORY定义CREATE TABLE "DB2INST1"."TRAVELADVISORY"  ("TITLE" VARCHAR(250 OCTETS) ,   "PUBDATE" VARCHAR(250 OCTETS) ,   "LINK" VARCHAR(250 OCTETS) ,   "GUID" VARCHAR(250 OCTETS) ,   "ADVISORYID" VARCHAR(250 OCTETS) ,   "DOMAIN" VARCHAR(250 OCTETS) ,   "CATEGORY" VARCHAR(4096 OCTETS) ,   "DESCRIPTION" VARCHAR(4096 OCTETS) ,   "UUID" VARCHAR(250 OCTETS) NOT NULL ,   "TS" BIGINT NOT NULL ,   "summary" VARCHAR(2048 OCTETS) )    IN "IBMDB2SAMPLEREL"   ORGANIZE BY ROW;ALTER TABLE "DB2INST1"."TRAVELADVISORY"  ADD PRIMARY KEY  ("UUID") ENFORCED;GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";GRANT CONTROL ON INDEX "SYSIBM  "."SQL230620142604860" TO USER "DB2INST1";SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE  FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC 

示例输出电子邮件

视频

源代码