用生成AI构建实时Slackbot
如何利用生成AI构建实时Slackbot
在这篇文章中,我将向您展示如何使用由Apache NiFi提供支持的Cloudera DataFlow实时与IBM WatsonX.AI基础大型语言模型进行交互。我们可以与任何基础模型一起工作,如Google FLAN T5 XXL或IBM Granite模型。
我将向您展示构建实时数据流水线的简易方法,将问题直接提供给在IBM Cloud上运行的安全WatsonX.AI模型,用于供应用程序如Slack和移动应用程序实时使用。我们将使用Cloudera Data Flow来处理所有的安全性、管理、谱系和治理。作为决策的一部分,我们可以根据提示的类型动态选择不同的WatsonX.AI模型。例如,如果我们想继续一个句子而不是回答问题,我可以选择不同的模型。对于回答问题,Google FLAN T5 XXL效果很好。如果我想继续句子,我会使用IBM Granite模型之一。
您会注意到WatsonX.AI模型返回我们所需结果的速度之快。我会进行一些快速的丰富和转换,然后将它们发送到Cloudera Apache Kafka,用于连续的分析和分发给许多其他应用程序、系统、平台和下游消费者。我们也会将答案输出给最初的请求者,这可以是Slack频道中的某个人或应用程序中的某个人。所有这些都是实时发生的,无需编码,完全具备治理、谱系、数据管理和安全性,并且可以实现任何规模和任何平台。
IBM和Cloudera在私有、公共和混合云环境中结合在一起,用于实时数据和人工智能,这只是一个开始。立即尝试吧。
逐步实时流程
首先,在Slack中,我输入一个问题:
“Q:集成生成式人工智能和Apache NiFi的好方法是什么?”
NiFi流程顶部
一旦输入了该问题,Slack服务器就会将这些事件发送到我们的注册服务。这可以在任何面向公众的地方托管。
- (点击此处获取Slack API链接)
Slack API
启用后,您的服务器将开始接收每个Slack帖子的JSON事件。这在NiFi中接收和解析非常容易。即使在设计模式下,Cloudera DataFlow在公有云托管版中也能轻松接收安全的HTTPS REST调用。
NiFi流程顶部2
在流程的第一部分,我们收到了REST JSON帖子,内容如下所示。
Slackbot 1.0 (+https://api.slack.com/robots)application/jsonPOSTHTTP/1.1{ "token" : "qHvJe59yetAp1bao6wmQzH0C", "team_id" : "T1SD6MZMF", "context_team_id" : "T1SD6MZMF", "context_enterprise_id" : null, "api_app_id" : "A04U64MN9HS", "event" : { "type" : "message", "subtype" : "bot_message", "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20
这是一个非常丰富详细的JSON文件,我们可以将其立即作为原始数据推送到Apache Iceberg Open Cloud Lakehouse、Kafka主题或对象存储中作为JSON文档(增强选项)。我只是要解析我需要的部分。
EvaluateJSONPath
我们解析出频道ID和帖子的纯文本。我只想要来自general(“C1SD6N197”)的消息。然后,我将文本复制到一个输入字段中,这是 Hugging Face所需的。
我们检查输入:如果是股票或天气(还会有更多内容),我们避免调用LLM。
SELECT * FROM FLOWFILEWHERE upper(inputs) like '%WEATHER%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE upper(inputs) like '%STOCK%'AND not upper(inputs) like '%LLM SKIPPED%'SELECT * FROM FLOWFILEWHERE (upper(inputs) like 'QUESTION:%'OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'and not upper(inputs) like '%STOCK%'
对于股票处理:
为了解析我们需要的股票,我使用了我的Open NLP处理器。
所以你需要下载处理器和实体提取模型。
然后我们将公司名称传递给AlphaVantage的HTTP REST端点,将公司名称转换为股票符号。在免费账户中,每天只有几次调用,所以如果失败了,我们就会绕过这一步,尝试使用您传递的任何内容。
使用RouteOnContent过滤错误消息。
然后我们使用QueryRecord处理器将CSV转换为JSON并进行过滤。
SELECT name as companyName, symbol FROM FLOWFILEORDER BY matchScore DESCLIMIT 1
我们使用SplitRecord确保只有一条记录。然后我们运行EvaluateJsonPath将字段作为属性获取。
在UpdateAttribute中,我们修剪股票符号,以防万一。
${stockSymbol:trim()}
然后我们将该股票符号传递给Twelve Data通过InvokeHTTP获取我们的股票数据。
然后我们得到了很多股票数据。
{ "meta" : { "symbol" : "IBM", "interval" : "1min", "currency" : "USD", "exchange_timezone" : "America/New_York", "exchange" : "NYSE", "mic_code" : "XNYS", "type" : "Common Stock" }, "values" : [ { "datetime" : "2023-11-15 10:37:00", "open" : "152.07001", "high" : "152.08000", "low" : "151.99500", "close" : "152.00999", "volume" : "8525" }, { "datetime" : "2023-11-15 10:36:00", "open" : "152.08501", "high" : "152.12250", "low" : "152.08000", "close" : "152.08501", "volume" : "15204" } ...
然后我们运行EvaluateJSONPath抓取交易所信息。
我们分叉记录以只获得一条记录,这只是返回给Slack。我们使用UpdateRecord调用来丰富股票数据和其他值。然后我们运行QueryRecord将我们限制在1条记录以发送到Slack。
SELECT * FROM FLOWFILEORDER BY 'datetime' DESCLIMIT 1
我们运行一个EvaluateJsonPath来获取最有价值的字段进行显示。
然后我们运行一个PutSlack,带上我们的消息。
LLM 跳过。${companyName} [${nlp_org_1}/${stockSymbol}] 的股票价值在${date}为${closeStockValue}。股票日期${stockdateTime}。股票交易所${exchange}
我们还有一个与公司名称分开的单独流程。
在第一步中,我们调用Yahoo Finance获取该股票的RSS头条新闻。
https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US
我们使用QueryRecord将RSS/XML记录转换为JSON。
然后我们运行SplitJSON来拆分新闻项。
我们运行SplitRecord以限制为一条记录。我们使用EvaluateJSONPath获取我们的Slack消息所需的字段。
然后我们运行UpdateRecord以完成我们的JSON。
然后我们将此消息发送到Slack。
LLM 跳过。${companyName} [${nlp_org_1}/${stockSymbol}] 的股票新闻信息在${date}${title} :${description}。${guid}文章日期${pubdate}
对于那些选择天气的人,我们会遵循类似的路线(我们应该在Aiven中使用Redis进行缓存),与股票相似。我们使用我的OpenNLP处理器来提取您可能想知道天气的地点。
下一步是使用处理器的输出构建要发送给我们的地理编码器的值。
weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "纽约市")}
如果我们找不到有效的位置,我会说“纽约市”。我们可以使用其他一些查询。我正在加载所有位置的工作,可以在其中进行一些高级的PostgreSQL搜索,或者可能是OpenSearch或矢量化数据存储。
我把那个位置传给Open Meteo,通过InvokeHTTP找到地理位置。
https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json
然后,我们从结果中解析我们需要的值。
{ "results" : [ { "id" : 5128581, "name" : "纽约", "latitude" : 40.71427, "longitude" : -74.00597, "elevation" : 10.0, "feature_code" : "PPL", "country_code" : "US", "admin1_id" : 5128638, "timezone" : "America/New_York", "population" : 8175133, "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ], "country_id" : 6252001, "country" : "美国", "admin1" : "纽约" } ], "generationtime_ms" : 0.92196465}
然后我们解析结果,以便通过InvokeHTTP调用另一个API获取该纬度和经度的当前天气。
https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}
结果是地理JSON。
{ "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "s": "https://schema.org/", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#", "geometry": { "@id": "s:GeoCoordinates", "@type": "geo:wktLiteral" }, "city": "s:addressLocality", "state": "s:addressRegion", "distance": { "@id": "s:Distance", "@type": "s:QuantitativeValue" }, "bearing": { "@type": "s:QuantitativeValue" }, "value": { "@id": "s:value" }, "unitCode": { "@id": "s:unitCode", "@type": "@id" }, "forecastOffice": { "@type": "@id" }, "forecastGridData": { "@type": "@id" }, "publicZone": { "@type": "@id" }, "county": { "@type": "@id" } } ], "id": "https://api.weather.gov/points/40.7143,-74.006", "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.006, 40.714300000000001 ] }, "properties": { "@id": "https://api.weather.gov/points/40.7143,-74.006", "@type": "wx:Point", "cwa": "OKX", "forecastOffice": "https://api.weather.gov/offices/OKX", "gridId": "OKX", "gridX": 33, "gridY": 35, "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast", "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly", "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35", "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations", "relativeLocation": { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.0279259, 40.745251000000003 ] }, "properties": { "city": "Hoboken", "state": "NJ", "distance": { "unitCode": "wmoUnit:m", "value": 3906.1522008034999 }, "bearing": { "unitCode": "wmoUnit:degree_(angle)", "value": 151 } } }, "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072", "county": "https://api.weather.gov/zones/county/NYC061", "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212", "timeZone": "America/New_York", "radarStation": "KDIX" }}
我们使用EvaluateJSONPath获取天气预报URL。
然后通过invokeHTTP调用该天气预报URL。
这会产生一个更大的JSON输出,我们将解析该输出以获取要返回给Slack的结果。
我们使用EvaluateJSONPath解析数据来获取天气的主要字段。
然后将这些字段格式化为PutSlack。
LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude}Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}
Slack 输出
如果我们有一个 LLM 问题,让我们确保只记录一条。
我们使用几个不同的模型,这些模型可以在 IBM 云上的 IBM WatsonX.AI 上快速访问我们的 REST 提示。
我最初在 IBM 的 Prompt Lab 中测试和构建了这些提示,然后从那里复制了最初的 curl 语句。
点击这里查看 与 IBM watsonx.ai 可用的基础模型支持。
ibm/mpt-7b-instruct2meta-llama/llama-2–70b-chatibm/granite-13b-chat-v1
我们需要将我们独特的安全密钥发送给 IBM,他们会给我们一个令牌来在下一步通话中使用。
我们解析问题然后通过 REST API 将其发送给 WatsonX。
我们构建的提示如下。
{ "model_id": "meta-llama/llama-2-70b-chat", "input": "${inputs:urlEncode()}", "parameters": { "decoding_method": "greedy", "max_new_tokens": 200, "min_new_tokens": 50, "stop_sequences": [], "repetition_penalty": 1 }, "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }
我们解析生成的文本,其中包括生成的 AI 结果以及一些有关时间的有用元数据。
Slack 发布的结果如下:
“您可以使用 Apache NiFi 以多种方式将生成的 AI 模型集成到其中:
- 数据预处理:使用 NiFi 在将其输入到生成的 AI 模型之前对数据进行预处理。这可以包括数据清理、转换和特征工程。
- 模型训练:使用 NiFi 自动化生成的 AI 模型的训练过程。您可以使用 NiFi 的 PutFile 和 PutFile_SFTP 处理器将训练数据写入文件,然后使用类似 ExecuteScript 的处理器运行训练脚本。
- 模型部署:生成 AI 模型训练完成后,您可以使用 NiFi 进行部署。您可以创建一个 NiFi 流来接收输入数据,将其通过生成的 AI 模型进行处理,然后输出生成的数据。
- 实时推理:您可以使用 NiFi 的 StreamingJobs”
Slackbot 发布结果后,它还会将度量和调试信息发布到chat 频道。
所有元数据都会发布到另一个 Slack 频道供管理员监控。
==== NiFi 到 IBM WatsonX.AI LLM AnswersOn 日期:2023 年 11 月 15 日 15:43:29 GMT 创建时间:2023 年 11 月 15 日 15:43:29.248Z 提示:Q:如何提高生成的 AI 和 Apache NiFi 的集成?回答:您可以使用 Apache NiFi 以多种方式将生成的 AI 模型集成到其中:1. 数据预处理:使用 NiFi 在将其输入到生成的 AI 模型之前对数据进行预处理。这可以包括数据清理、转换和特征工程。2. 模型训练:使用 NiFi 自动化生成的 AI 模型的训练过程。您可以使用 NiFi 的 PutFile 和 PutFile_SFTP 处理器将训练数据写入文件,然后使用类似 ExecuteScript 的处理器运行训练脚本。3. 模型部署:生成 AI 模型训练完成后,您可以使用 NiFi 进行部署。您可以创建一个 NiFi 流来接收输入数据,将其通过生成的 AI 模型进行处理,然后输出生成的数据。4. 实时推理:您可以使用 NiFi 的 StreamingJobsToken:200Req 时长:8153HTTP TX ID:89d71099-da23-4e7e-89f9-4e8f5620c0fbIBM 消息:此模型是受第三方许可证管理的非 IBM 产品,可能会施加使用限制和其他义务。使用此模型即表示您同意其条款,详细信息请参阅以下 URL。 URL:https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wxIBM 消息 ID:disclaimer_warningModel ID:meta-llama/llama-2-70b-chatStop 原因:max_tokensToken 计数:38TX ID:NGp0djg-c05f740f84f84b7c80f93f9da05aa756UUID:da0806cb-6133-4bf4-808e-1fbf419c09e3Corr ID:NGp0djg-c05f740f84f84b7c80f93f9da05aa756Global TX ID:20c3a9cf276c38bcdaf26e3c27d0479bService 时间:478Request ID:03c2726a-dcb6-407f-96f1-f83f20fe9c9c文件名:1a3c4386-86d2-4969-805b-37649c16addbRequest 时长:8153Request URL:https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29cf-ray :82689bfd28e48ce2-EWR=====
制作您自己的Slackbot
Slack输出
Kafka分发
Apache Flink SQL表创建DDL
CREATE TABLE `ssb`.`Meetups`.`watsonairesults` (`date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `advisoryId` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH ('deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillmanswers', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'watsonxaillmconsumer')CREATE TABLE `ssb`.`Meetups`.`watsonxresults` (`date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND) WITH ('deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillm', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'allwatsonx1')
示例提示
{"inputs":"请回答以下问题。美国的首都是什么?"}
IBM DB2 SQL
alter table "DB2INST1"."TRAVELADVISORY"add column "summary" VARCHAR(2048);-- DB2INST1.TRAVELADVISORY定义CREATE TABLE "DB2INST1"."TRAVELADVISORY" ("TITLE" VARCHAR(250 OCTETS) , "PUBDATE" VARCHAR(250 OCTETS) , "LINK" VARCHAR(250 OCTETS) , "GUID" VARCHAR(250 OCTETS) , "ADVISORYID" VARCHAR(250 OCTETS) , "DOMAIN" VARCHAR(250 OCTETS) , "CATEGORY" VARCHAR(4096 OCTETS) , "DESCRIPTION" VARCHAR(4096 OCTETS) , "UUID" VARCHAR(250 OCTETS) NOT NULL , "TS" BIGINT NOT NULL , "summary" VARCHAR(2048 OCTETS) ) IN "IBMDB2SAMPLEREL" ORGANIZE BY ROW;ALTER TABLE "DB2INST1"."TRAVELADVISORY" ADD PRIMARY KEY ("UUID") ENFORCED;GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1";SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC