通过Amazon SageMaker优化Talent.com的ETL数据处理 (Talent.com在Amazon SageMaker上简化ETL数据处理流程)

利用Amazon SageMaker优化Talent.com的ETL数据处理流程(Talent.com简化ETL数据处理流程的Amazon SageMaker应用)

本文由机器学习工程师Anatoly Khomenko和首席技术官Abdenour Bezzouh在Talent.com合著。

成立于2011年,Talent.com收集有偿的职位招聘信息以及公开的职位招聘信息,并创建了一个统一且易于搜索的平台。Talent.com涵盖了超过75个国家的3000万个职位招聘信息,涉及多种语言、行业和分发渠道,满足求职者多样化的需求,有效地将数百万求职者与工作机会联系起来。

Talent.com的使命是促进全球劳动力的连接。为实现这一目标,Talent.com从网络上的多个来源收集职位招聘信息,为求职者提供超过3000万个根据其技能和经验量身定制的工作机会。为配合这一使命,Talent.com与AWS合作开发了一款基于深度学习的领先职位推荐引擎,旨在帮助用户推动职业发展。

为确保职位推荐引擎的有效运作,关键是实施一个大规模数据处理管道,负责从Talent.com汇总的职位招聘信息中提取和精炼特征。该管道能够在不到1小时的时间内处理每天的500万条记录,并支持同时处理多天的记录。此外,该解决方案可以快速部署到生产环境。该管道的主要数据来源是存储在亚马逊简单存储服务(Amazon S3)中按日期分区的JSON Lines格式。每天会生成数以万计的JSON Lines文件,并进行日常增量更新。

该数据处理管道的主要目标是为Talent.com的职位推荐引擎的训练和部署提供必要的特征。值得注意的是,该管道必须支持增量更新,并满足训练和部署模块对职位推荐系统所需的复杂特征提取要求。我们的管道属于通用ETL(抽取、转换和加载)过程,将来自多个来源的数据合并到一个大型集中存储库中。

要获取关于Talent.com和AWS如何共同构建基于自然语言处理和深度学习模型训练技术的前沿见解,利用Amazon SageMaker构建职位推荐系统,参考从文本到梦想工作:如何在Talent.com利用Amazon SageMaker构建基于NLP的职位推荐引擎。该系统包括特征工程、深度学习模型架构设计、超参数优化和模型评估,所有模块均使用Python运行。

本文展示了如何使用SageMaker构建适用于Talent.com职位推荐引擎的大规模数据处理管道,并为其准备特征。所得到的解决方案使数据科学家可以在SageMaker Notebook中使用Python库(如Scikit-LearnPyTorch)构思特征提取,并快速将相同的代码部署到执行特征提取的数据处理管道中。与使用AWS Glue作为ETL解决方案时需要转移到PySpark上以使用特征提取代码不同,我们的解决方案不需要这样做。我们的解决方案可以完全由数据科学家一站式开发和部署,仅使用SageMaker,无需了解其他ETL解决方案,如AWS Batch。这可以显著缩短将机器学习(ML)管道部署到生产环境所需的时间。该管道通过Python操作,并与特征提取工作流程无缝集成,使其适用于各种数据分析应用。

解决方案概述

使用SageMaker处理的ETL管道概览

<p>管道由三个主要阶段组成:</p><ol type=”1″><li>利用<a href=”https://sihaiba.com/speed-up-ml-data-preparation-in-amazon-sagemaker-canvas.html”>Amazon SageMaker Processing</a>作业处理与指定日期相关的原始JSONL文件。可以同时处理多天的数据,使用单独的处理作业进行处理。</li><li>在处理多天的数据后,使用<a href=”https://sihaiba.com/awsbased-image-generation-app-using-generative-ai.html”>AWS Glue</a>进行数据爬取。</li><li>使用SQL从<a href=”https://www.xiaozhuai.com/building-a-data-pipeline-with-athena-and-mysql.html”>Amazon Athena</a>表加载指定日期范围的处理后的特征,然后训练和部署作业推荐模型。</li></ol><h2 id=”process-raw-jsonl-files”>处理原始JSONL文件</h2><p>我们使用SageMaker Processing作业处理指定日期的原始JSONL文件。该作业实施特征提取和数据压缩,并将处理后的特征保存为每个文件1百万条记录的Parquet文件。我们利用CPU并行性并行处理每个原始JSONL文件的特征提取。每个JSONL文件的处理结果保存在临时目录中的单独Parquet文件中。在处理完所有JSONL文件之后,我们将数千个小的Parquet文件压缩成每个文件1百万条记录的几个文件。压缩后的Parquet文件然后作为处理作业的输出上传到Amazon S3。数据压缩确保了下一个阶段的爬取和SQL查询的高效性。</p><p>以下是使用SageMaker SDK调度指定日期(例如2020-01-01)的SageMaker Processing作业的示例代码。作业从Amazon S3读取原始JSONL文件(例如从<code>s3://bucket/raw-data/2020/01/01</code>),并将压缩后的Parquet文件保存到Amazon S3(例如至<code>s3://bucket/processed/table-name/day_partition=2020-01-01/</code>)。</p><pre><code>### 安装依赖 %pip install sagemaker pyarrow s3fs awswranglerimport sagemakerimport boto3from sagemaker.processing import FrameworkProcessorfrom sagemaker.sklearn.estimator import SKLearnfrom sagemaker import get_execution_rolefrom sagemaker.processing import ProcessingInput, ProcessingOutputregion = boto3.session.Session().region_namerole = get_execution_role()bucket = sagemaker.Session().default_bucket()### 使用16个CPU和128 GiB内存的实例### 注意,脚本在压缩过程中不会将整个数据加载到内存中### 根据单个jsonl文件的大小,可能需要更大的实例instance = “ml.r5.4xlarge”n_jobs = 8 ### 使用8个处理进程date = “2020-01-01” ### 处理一个日期的数据est_cls = SKLearnframework_version_str = “0.20.0”### 调度处理脚本的作业script_processor = FrameworkProcessor( role=role, instance_count=1, instance_type=instance, estimator_cls=est_cls, framework_version=framework_version_str, volume_size_in_gb=500,)script_processor.run( code=”processing_script.py”, ### 主处理脚本的名称 source_dir=”../src/etl/”, ### 源代码目录的位置 ### 我们的处理脚本直接从S3加载原始jsonl文件 ### 这样可以避免处理作业的启动时间过长, ### 因为原始数据不需要复制到实例中 inputs=[], ### 处理作业的输入为空 outputs=[ ProcessingOutput(destination=”s3://bucket/processed/table-name/”, source=”/opt/ml/processing/output”), ], arguments=[ ### 作业输出的目录 “–output”, “/opt/ml/processing/output”, ### 实例内的临时目录 “–tmp_output”, “/opt/ml/tmp_output”, “–n_jobs”, str(n_jobs), ### 处理进程数 “–date”, date, ### 要处理的日期 ### S3中存储原始jsonl文件的位置 “–path”, “s3://bucket/raw-data/”, ], wait=False)</code></pre><p>主处理脚本(<code>processing_script.py</code>)运行SageMaker Processing作业的代码大纲如下:</p><pre><code>import concurrentimport pyarrow.dataset as dsimport osimport s3fsfrom pathlib import Path### 处理原始jsonl文件并将提取的特征保存到parquet文件的函数,来自process_data模块 from process_data import process_jsonl###解析命令行参数args=parse_args()###我们使用s3fs来爬取S3输入路径中的原始jsonl文件fs=s3fs.S3FileSystem()###我们假设原始jsonl文件存储在S3目录中,按日期分区###例如:s3://bucket/raw-data/2020/01/01/jsons=fs.find(os.path.join(args.path,*args.date.split(‘-‘)))###处理作业实例中的临时目录位置tmp_out=os.path.join(args.tmp_output,f”day_partition={args.date}”)###作业输出目录位置out_dir=os.path.join(args.output,f”day_partition={args.date}”)###使用n_jobs个处理工作进程并行处理各个jsonl文件futures=[]with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor: for file in jsons: inp_file=Path(file) out_file=os.path.join(tmp_out,inp_file.stem+”.snappy.parquet”) ### process_jsonl函数从S3位置(inp_file)读取原始jsonl文件 ### 并将结果保存到临时目录(tmp_out)中的parquet文件(out_file) futures.append(executor.submit(process_jsonl, file, out_file)) ###等待所有jsonl文件处理完毕 for future in concurrent.futures.as_completed(futures): result=future.result()###压缩parquet文件dataset=ds.dataset(tmp_out)if len(dataset.schema)>0: ### 保存有1百万条记录的压缩parquet文件 ds.write_dataset(dataset,out_dir,format=”parquet”, max_rows_per_file=1024*1024)</code></pre>

可伸缩性是我们的流水线的一个关键特性。首先,多个SageMaker处理作业可以同时处理几天的数据。其次,在处理每天指定的数据时,我们避免加载整个已处理或原始数据到内存中。这使得我们可以使用无法容纳一整天数据的主存的实例类型来处理数据。唯一的要求是实例类型应该可以同时将N个原始JSONL或已处理的Parquet文件加载到内存中,其中N是正在使用的处理工作器的数量。

使用AWS Glue爬取已处理数据

在多天的原始数据都被处理后,我们可以使用AWS Glue爬虫从整个数据集创建一个Athena表。我们使用AWS SDK for pandas (awswrangler)库使用以下代码片段创建表:

import awswrangler as wr### 用S3路径爬取已处理的数据res = wr.s3.store_parquet_metadata(    path='s3://bucket/processed/table-name/',    database="database_name",    table="table_name",    dataset=True,    mode="overwrite",    sampling=1.0,    path_suffix='.parquet',)### 打印表模式print(res[0])

加载已处理特征进行训练

现在,可以使用SQL从Athena表中加载指定日期范围的已处理特征,然后可以将这些特征用于训练工作推荐模型。例如,以下代码片段使用awswrangler库将一个月的已处理特征加载到DataFrame中:

import awswrangler as wrquery = """    SELECT *     FROM table_name    WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """### 从database_name.table_name加载一个月的数据到DataFrame中df = wr.athena.read_sql_query(query, database='database_name')

此外,使用SQL加载已处理特征进行训练可以扩展以适应各种其他用例。例如,我们可以应用类似的流水线来维护两个单独的Athena表:一个用于存储用户印象,另一个用于存储用户对这些印象的点击。使用SQL连接语句,我们可以检索用户点击或未点击的印象,然后将这些印象传递给模型训练作业。

解决方案的优势

实施建议的解决方案为我们现有的工作流程带来了几个优势,包括:

  • 简化的实施 – 该解决方案使得使用常用的机器学习库在Python中实现特征提取变得简单。而且,它不需要将代码移植到PySpark中。这使得特征提取更加简单,因为数据科学家在笔记本中开发的相同代码将由这个流水线执行。
  • 快速的上线路径 – 数据科学家可以开发和部署该解决方案以在大规模上执行特征提取,从而使他们能够根据这些数据开发一个ML推荐模型。与此同时,ML工程师几乎不需要进行修改,就可以将相同的解决方案部署到生产环境中。
  • 可重用性 – 该解决方案提供了一个可重用的模式,用于大规模特征提取,并且可以很容易地适应其他用例,超越了构建推荐模型。
  • 高效性 – 该解决方案具有良好的性能:处理Talent.com的数据的一个工作日不到1小时。
  • 增量更新 – 该解决方案还支持增量更新。新的每日数据可以通过SageMaker处理作业进行处理,然后可以重新爬取包含已处理数据的S3位置以更新Athena表。我们还可以使用cron作业每天更新几次数据(例如,每隔3小时一次)。

我们使用这个ETL流水线帮助Talent.com处理每天包含50000个文件的数据,其中包含500万条记录,并使用从Talent.com的90天原始数据提取的特征创建训练数据,总共有450万条记录,跨900,000个文件。我们的流水线帮助Talent.com在短短2周内构建并部署推荐系统到生产环境中。该解决方案在Amazon SageMaker上执行了所有的ML过程,而没有使用其他AWS服务。作业推荐系统相比以前基于XGBoost的解决方案在在线A/B测试中提高了8.6%的点击率,帮助数百万Talent.com用户连接到更好的工作。

结论

这篇文章概述了我们在Talent.com为培训和部署职位推荐模型开发的ETL流水线。我们的流水线使用SageMaker处理作业进行高效的数据处理和大规模特征提取。特征提取代码使用Python实现,可利用流行的机器学习库在大规模上进行特征提取,无需重新编写代码以使用PySpark。

我们鼓励读者探索在需要大规模特征提取的用例中使用本博客中提出的流水线作为模板的可能性。数据科学家可以利用该流水线构建机器学习模型,然后该流水线可以被机器学习工程师用于生产环境。这可以显著减少将机器学习解决方案推向市场所需的时间,就像Talent.com的情况一样。读者可以参考设置和运行SageMaker处理作业的教程。我们还建议读者查看文章通过文本找到梦想工作:在Talent.com上构建基于NLP的职位推荐器,在该文章中我们讨论了利用Amazon SageMaker构建Talent.com职位推荐系统的深度学习模型训练技术。