如何更高效地存储历史数据
高效存储历史数据
使用PySpark的实践教程:只存储DataFrame的0.01%的行而不丢失任何信息。
在公司和组织收集的数据越来越多的时代,数据集往往会累积数百万条不包含任何新的或有价值信息的无用行。在本文中,我们将关注数据管理的一个关键方面:使用PySpark删除数据集中没有增加任何价值的行。
*在处理非常大的数据集时,使用PySpark而不是pandas,因为它可以跨多台计算机处理数据,使其更快速和可扩展。Pandas适用于可以在单台计算机的内存中容纳的较小数据集,但对于大数据可能会变慢甚至不切实际。
让我们想象以下情况:你在一个房地产公司的维护部门担任数据工程师/科学家的职位。在过去的十年里,你的公司从一个外部数据库中完整加载了所有维护数据,并将其存储在公司的云存储中。数据可能如下所示:
这个数据集中包含了三列:
id
-> 建筑物的ID。condition
-> 一个介于1(糟糕)和10(优秀)之间的整数,表示建筑物的状态。import_date
-> 表示导入此行的日期时间列。
要自己创建此数据集,请运行下面的代码片段:
from pyspark.sql import SparkSession, Rowfrom pyspark.sql.functions as ffrom pyspark.sql.types import IntegerType, DateTypeimport random# 设置随机数种子以在重新运行时获得相同的结果random.seed(42)# 创建一个Spark会话spark = SparkSession.builder.getOrCreate()# 创建ID列表ids = list(range(1, 11)) # 根据所需的唯一ID数调整此列表大小# 为每个ID创建两个可能的条件conditions = [[random.randint(1, 10) for _ in range(2)] for _ in ids]# 创建一个元组列表,其中每个元组是一行rows = [(id, random.choice(conditions[id-1]), date) for id in ids…