使用不到100行Python代码的动态规划进行库存优化

使用不到百行Python代码进行库存优化的动态规划

如何通过动态规划掌握库存决策以实现最大利润

来自Unsplash的Christin Hume的照片

介绍

库存优化是一个广泛存在于许多领域的问题。核心问题在于弄清楚:

“为了最大化利润,每次周期性地为您的商店订购多少产品?”

我想你是一家自行车店的经理。每天,您需要联系供应商为您的商店订购一些自行车。如果您每天订购的自行车太多,您将花费太多的钱来维护和保持店内的自行车。另一方面,如果您订购得太少,可能无法满足客户的需求。您需要制定一种平衡两个因素的“订购策略”。

我们将分解这个问题并找出最优的“订购策略”。这个“最优订购策略”基于三个支柱:

  • 马尔可夫过程
  • 马尔可夫奖励过程
  • 马尔可夫决策过程

在这里和这里,我详细介绍了如何构建“库存优化”问题、马尔可夫过程和马尔可夫奖励过程。

在这篇博客中,我们将总结所有内容,并将马尔可夫过程和马尔可夫奖励过程与马尔可夫决策过程相连接,以得到最优的“订购策略”,并附带Python代码。

马尔可夫决策过程:

马尔可夫决策过程(MDP)是一个数学模型,其关键组件包括状态、动作、转移概率和奖励。

在马尔可夫奖励过程中,决策是固定的,我们不考虑每个状态的所有可能动作。在马尔可夫决策过程(MDP)中,我们需要构建一个数据结构,对于每个状态,我们需要考虑所有可能的动作,并根据$(S_t,A_t)$来确定$(S_{t+1}, R_{t+1})$。

马尔可夫决策过程数据结构示例

这里,我给您一个马尔可夫决策过程字典的示例,您可以看到对于每个状态,需要考虑所有可能的动作:

马尔可夫决策过程字典 = {"当前状态A":{"动作1":{("从A动作1到下一个S1","奖励1"):"从A动作1到下一个S1的概率",                                           ("从A动作1到下一个S2","奖励2"):"从A动作1到下一个S2的概率"},                                              "动作2":{("从A动作2到下一个S1","奖励2"):"从A动作2到下一个S1的概率",                                           ("从A动作2到下一个S2","奖励2"):"从A动作2到下一个S2的概率"}},                                          "当前状态B":{"动作1":{("从B动作1到下一个S1","奖励1"):"从B动作1到下一个S1的概率",                                           ("从B动作1到下一个S2","奖励2"):"从B动作1到下一个S2的概率"},                                              "动作2":{("从B动作2到下一个S1","奖励2"):"从B动作2到下一个S1的概率",                                           ("从B动作2到下一个S2","奖励2"):"从B动作2到下一个S2的概率"}}}对于当前状态,操作 in 马尔可夫决策过程字典.items():    print(f"当前状态: {当前状态}")        for 动作, 转换 in 操作.items():        print(f"  动作: {动作}")                for (下一个状态, 奖励), 概率 in 转换.items():            print(f"  ({下一个状态},{奖励}): {概率}")
Python代码的输出 — 来源:作者

马尔可夫决策过程优化库存

在这里,我们正在构建字典,其中对于每个状态,我们考虑所有可能的动作,对于每个动作,我们考虑所有可能的状态和奖励。该字典的名称是**MDP_dict**,我在下面给出了代码:

from typing import Dict, Tuple# poisson is used to find pdf of Poisson distribution from scipy.stats import poissonMDP_dict: Dict[tuple, Dict[tuple, tuple]] = {}user_capacity = 2user_poisson_lambda = 1holding_cost = 1missedcostumer_cost = 10for alpha in range(user_capacity+1):                                                                                               for beta in range(user_capacity + 1 - alpha):                # 这是当前状态St        state = (alpha, beta)                                           # 这是早上8点钟的初始库存,即总共有的自行车数量         init_inv = alpha + beta                                         base_reward = -alpha* holding_cost                action = {}        # 考虑所有可能的动作        for order in range(user_capacity-init_inv +1):                        #action = {}            dict1 = {}            for i in range(init_inv +1):            # 如果初始需求能够满足需求                if i <= (init_inv-1):                                # 特定需求发生的概率                    transition_prob = poisson.pmf(i,user_poisson_lambda)                    dict1[((init_inv - i, order), base_reward)] = transition_prob                                     # 如果初始需求不能满足需求                else:                                    transition_prob = 1- poisson.cdf(init_inv -1, user_poisson_lambda)                                # 未满足需求的概率                    transition_prob2 = 1- poisson.cdf(init_inv, user_poisson_lambda)                                # 总奖励                                    reward = base_reward - missedcostumer_cost*((user_poisson_lambda*transition_prob) - \                                                  init_inv*transition_prob2)                                    dict1[((init_inv - i, order),reward)] = transition_prob                    #if state in MDP_dict:            action[order] = dict1        MDP_dict[state]= actionMDP_dict# 常量

“MDP_dict”是一个Python字典,其中每个键表示“当前状态”,值是“该状态下的所有可能动作”,对应的下一个状态、即时奖励和下一个状态的概率。以下是示意图的解释:

MDP_dict示意图 — 来源:作者

动态规划

Richard Bellman(1950年代)首次提出了称为动态规划(DP)的术语。动态规划是一种通过将复杂问题分解为简单子问题来解决复杂问题的方法。

DP通常指的是马尔可夫决策过程的一般理论和寻找MDP中最优策略的算法,在很大程度上依赖于贝尔曼方程。

在本文的上下文中,我们使用动态规划这个术语来解决库存优化问题的最优策略。

通常有两种重要的DP算法:

值函数迭代算法Bellman 1957

– 策略迭代算法(Howard 1960)

在本文中,我们将重点关注策略迭代算法,并将在Python中实现它。

库存优化的策略迭代算法:

策略迭代算法是一种用于寻找给定MDP的最优策略的方法。该算法基于以下思想:

– 1)从一个初始策略$π_0$开始。

– 2)通过计算状态值函数$V^{π_0}$来评估策略$π_0$。

– 3)通过根据$V^{\pi_0}$的贪婪行为来改进策略,得到一个新的策略$\pi_1$。

该算法按照上述步骤迭代,直到策略收敛(策略不再发生变化)。我们将在以下几个部分逐一介绍这三个阶段。

简单示意图,用于解释策略迭代算法 — 来源:作者

1) 从初始策略开始

策略迭代算法需要一个初始策略(订货策略),可以是任意的策略。在本文中,我们将使用第二篇文章中找到的策略,即以下策略:

初始策略:

π_0=C-(α + β)

初始策略是,在库存的每个阶段,我们订购$C-(α + β)$的数量,其中$C$是库存的容量,$α$是库存中初始物品的总和,$β$是明天早上要到货的物品数量。

以下是初始策略的代码:

user_capacity_val = 2def policy_0_gen(user_capacity: int):                # 生成初始策略        return {(alpha, beta): user_capacity - (alpha + beta)                 for alpha in range(user_capacity + 1)                 for beta in range(user_capacity + 1 - alpha)}policy_0 = policy_0_gen(user_capacity_val)policy_0

2) 通过计算状态值函数V^{π_0}来评估策略π_0。

需要注意的是,任何具有固定策略的马尔可夫决策过程都会导致**暗含的**马尔可夫奖励过程。因此,就像前一篇文章一样,如果我们有马尔可夫奖励过程,我们可以找到每个状态的值函数。

换句话说:

参考:作者

下面的函数将获取一个**固定策略**作为输入,并返回一个暗含的马尔可夫奖励过程:

def MRP_using_fixedPolicy(full_MDP, policy):        # 使用固定策略计算马尔可夫奖励过程        MRP_policy = {}        for state in full_MDP.keys():            action = policy[state]            MRP_policy[state] = full_MDP[state][action]        return MRP_policy

例如,我们可以将初始策略提供给以下函数,并且它将返回暗含的马尔可夫奖励过程:

MRP_p0=MRP_using_fixedPolicy(MDP_dict, policy_0)MRP_p0
Python 代码的输出 — 来源:作者

有了马尔可夫奖励过程,就很容易找到每个状态的即时奖励和状态值函数:

def calculate_expected_immediate_rewards(MRP_policy):        # 计算马尔可夫奖励过程的即时奖励        E_immediate_R = {}        for from_state, value in MRP_policy.items():            expected_reward = sum(reward[1] * prob for (reward, prob) in value.items())            E_immediate_R[from_state] = expected_reward        return E_immediate_RR_ime_p0 = calculate_expected_immediate_rewards(MRP_p0)R_ime_p0

import numpy as npimport pandas as pddef create_transition_probability_matrix(MRP_policy):        # 创建转移概率矩阵        states = list(MRP_policy.keys())        num_states = len(states)        trans_prob = np.zeros((num_states, num_states))        df_trans_prob = pd.DataFrame(trans_prob, columns=states, index=states)        for i, from_state in enumerate(states):            for j, to_state in enumerate(states):                for (new_state, reward) in MRP_policy.get(from_state, {}):                    if new_state == to_state:                        probability = MRP_policy[from_state].get((new_state, reward), 0.0)                        df_trans_prob.iloc[i, j] = probability                                return df_trans_probdef calculate_state_value_function(trans_prob_mat, expected_immediate_rew, gamma=0.9):        # 计算状态值函数        states = list(expected_immediate_rew.keys())        R_exp = np.array(list(expected_immediate_rew.values()))        val_func_vec = np.linalg.solve(np.eye(len(R_exp)) - gamma * trans_prob_mat, R_exp)        MarkRevData = pd.DataFrame({'Expected Immediate Reward': R_exp, 'Value Function': val_func_vec},                                    index=states)        return MarkRevDatatrans_prob_p0 = create_transition_probability_matrix(MRP_p0)state_val_p0 = calculate_state_value_function(trans_prob_mat=trans_prob_p0,                               expected_immediate_rew=R_ime_p0)state_val_p0
Python代码的输出 — 来源:作者

3)通过贪婪地行动来改善策略,基于$V^{π_0}$得到新策略$π_1$。

策略迭代算法的最后一部分是通过贪婪地行动来改善策略,基于$V^{\pi_0}$得到新策略$\pi_1$。

贪婪方程基于贝尔曼方程,实际上是为每个状态找到具有最高**状态值**函数的动作。

贝尔曼最优性方程 — 来源:作者

这是贪婪操作的代码:

def greedy_operation(MDP_full, state_val_policy, old_policy, gamma=0.9):        # 执行贪婪操作以改善策略        new_policy = {}        for state in old_policy.keys():            max_q_value, best_action  = float('-inf'), None            state_val_dict = state_val_policy.to_dict(orient="index")            for action in MDP_full[state].keys():                q_value = 0                for (next_state, immediate_reward), probability in MDP_full[state][action].items():                    q_value = q_value +  probability * (immediate_reward + gamma *                        (state_val_dict[next_state]["Value Function"]))                if q_value > max_q_value:                    max_q_value, best_action = q_value, action            new_policy[state] = best_action        return new_policynew_policy = greedy_operation(MDP_full=MDP_dict,                               state_val_policy=state_val_p0,                               old_policy=policy_0)

策略迭代算法经过第一轮迭代后的新策略如下:

new_policy

在策略迭代算法中,我们反复进行上述三个步骤,直到策略收敛。换句话说,我们保持在上述三个步骤上进行迭代,直到策略停止变化。这是策略迭代算法的代码:

def policy_iteration():        # 执行策略迭代以找到最优策略        policy = policy_0_gen(user_capacity_val)        while True:            MRP_policy_p0 = MRP_using_fixedPolicy(MDP_dict, policy)            expected_immediate_rew = calculate_expected_immediate_rewards(MRP_policy_p0)            trans_prob_mat_val = create_transition_probability_matrix(MRP_policy_p0)            value_function = calculate_state_value_function(trans_prob_mat=trans_prob_mat_val,                                                            expected_immediate_rew=expected_immediate_rew,                                                            gamma=0.9)            new_policy = greedy_operation(MDP_full=MDP_dict,                                           state_val_policy=value_function,                                           old_policy=policy)            if new_policy == policy:                break            policy = new_policy                opt_policy = new_policy        opt_value_func = value_function                return opt_policy, opt_value_func

最优顺序是什么?

现在,我们可以查看策略迭代的结果,并查看每个状态的最优顺序是什么。以下是代码:

for state, order_quantity in opt_policy.items():    print(f"对于状态{state},最优订购数量是:{order_quantity}")
最优的最终策略 — 来源:作者

将所有内容放在一起:

我将所有的代码都放在一个Python类中,并添加了相应的方法。

import numpy as np
from scipy.stats import poisson
import pandas as pd

class MarkovDecisionProcess:
    def __init__(self, 用户容量, 泊松分布率, 持有成本, 短缺成本, 折现系数):
        # 使用给定参数初始化MDP
        self.用户容量 = 用户容量
        self.泊松分布率 = 泊松分布率
        self.持有成本, self.短缺成本 = 持有成本, 短缺成本
        self.折现系数 = 折现系数
        self.完整的MDP = self.创建完整的MDP()  # 创建完整的MDP

    def 创建完整的MDP(self):
        # 创建完整的MDP字典
        MDP字典 = {}
        for alpha in range(self.用户容量 + 1):
            for beta in range(self.用户容量 + 1 - alpha):
                状态, 初始库存 = (alpha, beta), alpha + beta
                动作 = {}
                for 订购量 in range(self.用户容量 - 初始库存 + 1):
                    字典1 = {}
                    for i in range(初始库存 + 1):
                        if i <= (初始库存 - 1):
                            转移概率 = poisson.pmf(i, self.泊松分布率)
                            字典1[((初始库存 - i, 订购量), -alpha * self.持有成本)] = 转移概率
                        else:
                            转移概率 = 1 - poisson.cdf(初始库存 - 1, self.泊松分布率)
                            转移概率2 = 1 - poisson.cdf(初始库存, self.泊松分布率)
                            奖励 = -alpha * self.持有成本 - self.短缺成本 * ((self.泊松分布率 * 转移概率) - 初始库存 * 转移概率2)
                            字典1[((0, 订购量), 奖励)] = 转移概率
                    动作[订购量] = 字典1
                MDP字典[状态] = 动作
        return MDP字典

    def 初始策略生成(self):
        # 生成初始策略
        return {(alpha, beta): self.用户容量 - (alpha + beta)
                for alpha in range(self.用户容量 + 1)
                for beta in range(self.用户容量 + 1 - alpha)}

    def 使用固定策略创建MRP(self, 策略):
        # 使用固定策略创建MRP
        return {状态: self.完整的MDP[状态][动作]
                for 状态, 动作 in 策略.items()}

    def 计算状态值函数(self, MRP策略):
        # 计算MRP策略的预期即时奖励
        预期奖励 = {}
        for 起始状态, 值 in MRP策略.items():
            预期奖励值 = sum(奖励[1] * 概率 for (奖励, 概率) in 值.items())
            预期奖励[起始状态] = 预期奖励值
        # 创建转移概率矩阵
        状态列表 = list(MRP策略.keys())
        转移概率 = np.zeros((len(状态列表), len(状态列表)))
        df转移概率 = pd.DataFrame(转移概率, columns=状态列表, index=状态列表)
        for i, 起始状态 in enumerate(状态列表):
            for j, 目标状态 in enumerate(状态列表):
                for (新状态, 奖励) in MRP策略.get(起始状态, {}):
                    if 新状态 == 目标状态:
                        概率 = MRP策略[起始状态].get((新状态, 奖励), 0.0)
                        df转移概率.iloc[i, j] = 概率
        # 计算状态值函数
        即时奖励期望 = np.array(list(预期奖励.values()))
        状态值函数 = np.linalg.solve(np.eye(len(即时奖励期望)) - self.折现系数 * df转移概率, 即时奖励期望)
        MarkRevData = pd.DataFrame({'预期即时奖励': 即时奖励期望, '值函数': 状态值函数}, index=状态列表)
        return MarkRevData

    def 贪婪改进(self, MDP完整, 状态值策略, 旧策略):
        # 执行贪婪操作以改进策略
        新策略 = {}
        for 状态 in 旧策略.keys():
            单位最大值, 最佳动作  = float('-inf'), None
            状态值字典 = 状态值策略.to_dict(orient="index")
            for 动作 in MDP完整[状态].keys():
                累计值 = 0
                for (下一状态, 即时奖励), 概率 in MDP完整[状态][动作].items():
                    累计值 = 累计值 +  概率 * (即时奖励 + self.折现系数 *
                        (状态值字典[下一状态]["值函数"]))
                if 累计值 > 最大值:
                    最大值, 最佳动作 = 累计值, 动作
            新策略[状态] = 最佳动作
        return 新策略

    def 策略迭代(self):
        # 执行策略迭代以找到最优策略
        策略 = self.初始策略生成()
        while True:
            MRP策略_p0 = self.使用固定策略创建MRP(策略)
            状态值函数 = self.计算状态值函数(MRP策略_p0)
            新策略 = self.贪婪改进(self.完整的MDP, 状态值函数, 策略)
            if 新策略 == 策略:
                break
            策略 = 新策略
        最优策略, 最优值函数 = 新策略, 状态值函数
        return 最优策略, 最优值函数

使用Markov决策过程类的示例

# 示例用法:用户容量 = 2泊松λ = 1.0持有成本 = 1缺货成本 = 10γ = 0.9MDP_Example = MarkovDecisionProcess(user_capacity, poisson_lambda, holding_cost, stockout_cost, gamma)opt_policy,opt_val = MDP_Example.policy_iteration()# 打印最优策略print("最优策略:")对于状态,订购数量 in opt_policy.items():执行打印(f"对于状态{state},最优订购数量为:{order_quantity}")# 打印最优值函数print("\n最优值函数:")打印(opt_val)
Python代码的输出 - 来源:作者

摘要:

  • 库存优化不是一个静态优化问题;相反,它是一个顺序决策问题。这意味着我们需要一个“自适应”策略,其中决策(订单数量)取决于库存的状态。
  • 在这篇博客中,我们根据动态规划算法找到了“最优”的订购策略。
  • 目标是建立关于如何思考库存优化问题、如何构建它们以及如何使用适当的数学建模方法解决它们的基础知识。

参考文献:

感谢您一直阅读!

我希望这篇文章提供了一个易于理解的教程,教您如何使用Python进行库存优化。

如果您认为本文帮助您更多地了解库存优化和马尔科夫过程,请点赞并关注!