卡夫卡事件流AI和自动化
卡夫卡神秘事件:AI与自动化的前沿探索
Apache Kafka已成为企业架构中从数据静态(数据库事务)转向事件流处理的明确领导者。有很多演示介绍了Kafka的工作原理以及如何扩展这个技术栈(无论是本地还是云端)。使用ChatGPT构建一个微服务来消费消息,并进行丰富、转换和持久化是项目的下一阶段。在这个例子中,我们将从一个物联网设备(Raspberry Pi)接收输入,该设备每隔几秒发送一个JSON温度读数。
消费消息
每个Kafka事件消息被生产(并记录)后,一个Kafka微服务消费者会准备处理每个消息。我让ChatGPT生成了一些Python代码,它给了我基本的轮询和从命名的”topic”读取的逻辑。我得到了一个相当不错的开始来消费主题、键和JSON载荷。ChatGPT生成的代码使用SQLAlchemy将其持久化到数据库中。然后,我想转换JSON负载并使用API Logic Server(ALS – GitHub上的开源项目)规则对JSON进行解包、验证、计算,并基于源温度是否在给定范围之外生成一组新的消息载荷。
注意:ChatGPT选择了Confluent Kafka库(并使用了它们的Docker Kafka容器),你可以修改你的代码来使用其他Python Kafka库。
SQLAlchemy模型
使用API Logic Server(ALS:一个Python开源平台),我们连接到一个MySQL数据库。ALS将读取表格并创建一个SQLAlchemy ORM模型,一个react-admin用户界面,safrs-JSON Open API(Swagger),以及每个ORM终端点的运行REST web服务。新的Temperature表将保存时间戳、物联网设备ID和温度读数。在这里,我们使用ALS命令行工具来创建ORM模型:
API Logic Server生成的用于保存我们的Temperature
值的类。
变更
因此,不再将Kafka JSON消费者消息再次保存在SQL数据库中(并触发规则来完成工作),我们解开JSON负载(util.row_to_entity
)并将其插入到Temperature表中,而不是保存JSON负载。我们让声明式规则处理每个温度读数。
当消费者接收到消息时,它将将其添加到会话中,这将触发commit_event
规则(如下所示)。
声明式逻辑:生成消息
使用API Logic Server(一种使用SQLAlchemy、Flask和LogicBank类似于电子表格的规则引擎:公式、求和、计数、复制、约束、事件等)在ORM实体Temperature
上添加一个声明式commit_event
规则。每当消息被持久化到Temperature表中时,都会调用commit_event
规则。如果温度读数超过MAX_TEMP
或小于MIN_TEMP
,我们将在主题“TempRangeAlert”
上发送一条Kafka消息。我们还添加了一个约束条件,以确保我们接收到的数据在正常范围内(32
–132
)。我们将让另一个事件消费者处理警报消息。
只有当温度读数大于MAX_TEMP
或小于MIN_TEMP
时才生成警报消息。约束将在调用提交事件之前检查温度范围(请注意,规则始终是无序的,并且可以根据规范的变化进行引入)。
TDD Behave测试
使用 TDD (测试驱动开发),我们可以编写一个Behave测试将记录直接插入到Temperature表中,并检查返回值KafkaMessageSent
。Behave从一个Feature
/Scenario
(.feature文件)开始。对于每个场景,我们使用Behave
装饰器编写相应的Python类。
特性定义
TDD Python类
摘要
使用ChatGPT生成Kafka消息代码,用于Consumer和Producer似乎是一个很好的起点。安装Confluent Kafka的Docker。使用API Logic Server进行声明性逻辑规则允许我们向正常事务流程中添加公式、约束和事件,并且生成(和转换)新的Kafka消息是一个很好的组合。ChatGPT和声明性逻辑是“配对编程”的下一个级别。