django:信号机制

导读:本篇文章讲解 django:信号机制,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

环境:

  • win 10专业版
  • django 4.0
  • pycharm pro 2021.2.3

一,django信号

(一)什么是信号

通俗来说,信号就是通信双方约定的一种信息通知方式,双方通过信号来确定发生了什么事情,然后决定自己应该做什么:

  • 交通管理部门通过红绿灯告知司机与行人当前路口的通行许可。
  • 舰队之间通过旗语传递信息。

看得出,信号做基本的作用就是一方通知另一方发生了什么。

Django有一个信号调度器(signal
dispatcher)
,用来帮助解耦的应用获知框架内任何其他地方发生了操作。简单地说,信号允许某些发送器去通知一组接收器某些操作发生了。当许多代码段都可能对同一事件感兴趣时,信号特别有用。
—— Django Documentation: Signals

(二)信号的使用场景

1,信号的直接使用场景

通信双方显然都在关注同一件事情:

  • 交通管理部门、司机与行人都关注安全。
  • 舰队之间关注船舶航行信息。

而 Django 框架内的组件或应用共同关注的有请求与响应、用户的登录和注销、模型的操作、任务管理、测试、数据库的管理等。为此,Django 提供了 set of built-in signals

一些使用场景:

  • 用户登陆后,系统向他发送最新动态信息。
  • 数据库数据发生变化后,实现缓存数据同步变化。
  • 订单中商品数量影响库存数量,即不同模型的联动更新。

2,使用信号的终极目的

这是一个应用程序耦合的 Django 项目。黄色箭头显示哪些应用程序相互导入内容:

在这里插入图片描述这个项目紧密耦合的原因是应用程序之间存在循环依赖关系,导致无法在不破坏其他应用程序的情况下删除某个一个应用程序,因此应该避免这种循环依赖。

这是一个有单一依赖流的项目:

在这里插入图片描述这个项目的结构要好得多,但实现单一流程并不总是那么简单。这就是信号可以提供帮助的地方——使用信号来避免引入循环依赖,这就能够帮助解耦的应用获知框架内任何其他地方发生的操作。

信号的存在,避免让好不容易解耦的应用的耦合度再次变高——简单胜于复杂

3,使用原则

如果一个应用程序想要触发它引用的应用程序中的行为,则不要使用信号,而是直接导入它所需要的行为。

如果一个应用程序想要触发依赖于该应用程序的应用程序中的行为,则可以在第二个应用程序中用接收器接收第一个应用程序发送给它的信号。

如果信号接收器要处理大量I/O操作,也不要使用信号机制,因为它基于同步实现。

When to Use Django Signals

二,如何使用信号

先从源码解读信号,了解各部分关系后再自定义信号,最后讲如何使用django准备好的内置的信号。

(一)自定义与使用信号

所有的信号都是 django.dispatch.Signal 的实例,源码如下:

class Signal:
    """
    所有信号的基类。

    内部属性:
        receivers
            { receiverkey (id) : weakref(receiver) }
    """
    def __init__(self, use_caching=False):
        """
        创建一个新信号。
        """
        self.receivers = []
        if providing_args is not None:
            warnings.warn(
                'The providing_args argument is deprecated. As it is purely '
                'documentational, it has no replacement. If you rely on this '
                'argument as documentation, you can move the text to a code '
                'comment or docstring.',
                RemovedInDjango40Warning, stacklevel=2,
            )
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # 为了方便起见,我们创建空缓存,即使它们没有被使用。
        # 关于缓存的一个注意事项:如果定义了use_caching,那么对于每个不同的发送方,我们缓存发送方在'sender_receivers_cache'中的接收方。当调用.connect()或.disconnect()并在send()中填充时,缓存将被清除。
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False

    def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
       """
        将信号的接收器连接到发送器。

        参数:
            receiver
                用于接收信号的函数或实例方法。
                接收器必须是 hashable objects。
                如果weak为True,那么receiver必须是弱引用。
                接收器必须能够接受关键字参数。
                如果一个接收器在连接时使用了一个dispatch_uid参数,那么如果已经有接收器在连接时使用了这个dispatch_uid,那么该参数将不会被添加。

            sender
            	接收器应该响应的发送器。必须是一个Python对象,或None以让接收器接收来自任何接收器的事件。
            weak
                是否对接收器使用弱引用。默认情况下,模块将尝试使用接收器对象的弱引用。如果该参数为false,则将使用强引用。

            dispatch_uid
                一个标识符,用于唯一地标识接收器的特定实例。这通常是一个字符串,尽管它可以是任何 hashable objects。
        """
        from django.conf import settings

        # 如果DEBUG是打开的,检查是否得到一个良好的接收器。
        if settings.configured and settings.DEBUG:
            assert callable(receiver), "Signal receivers must be callable."

            # 检查关键字参数 **kwargs
            if not func_accepts_kwargs(receiver):
                raise ValueError("Signal receivers must accept keyword arguments (**kwargs).")

        if dispatch_uid:
            lookup_key = (dispatch_uid, _make_id(sender))
        else:
            lookup_key = (_make_id(receiver), _make_id(sender))

        if weak:
            ref = weakref.ref
            receiver_object = receiver
            # 检查绑定的方法。
            if hasattr(receiver, '__self__') and hasattr(receiver, '__func__'):
                ref = weakref.WeakMethod
                receiver_object = receiver.__self__
            receiver = ref(receiver)
            weakref.finalize(receiver_object, self._remove_receiver)

        with self.lock:
            self._clear_dead_receivers()
            if not any(r_key == lookup_key for r_key, _ in self.receivers):
                self.receivers.append((lookup_key, receiver))
            self.sender_receivers_cache.clear()

    def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
        """
        断开接收器到发送端的连接。
		
		如果使用弱引用,则不需要调用disconnect。接收器将自动从发送中移除。

        参数:
            receiver
                要断开连接的已注册接收器。如果指定dispatch_uid,则可能为none。

            sender
                要断开连接的已注册发送器。

            dispatch_uid
                要断开连接的接收器的唯一标识符。
        """
        if dispatch_uid:
            lookup_key = (dispatch_uid, _make_id(sender))
        else:
            lookup_key = (_make_id(receiver), _make_id(sender))

        disconnected = False
        with self.lock:
            self._clear_dead_receivers()
            for index in range(len(self.receivers)):
                (r_key, _) = self.receivers[index]
                if r_key == lookup_key:
                    disconnected = True
                    del self.receivers[index]
                    break
            self.sender_receivers_cache.clear()
        return disconnected

    def has_listeners(self, sender=None):
        return bool(self._live_receivers(sender))

    def send(self, sender, **named):
        """
        将信号从发送器发送到所有连接到它的接收器。

        如果任何接收器引发错误,该错误将通过 send 传播回来,终止调度。因此,如果错误被引发,可能不会调用所有的接收器。

        参数:
            sender
                信号的发送器。是指定的对象或None。

            named
            	被命名的参数将被传递给接收器。

        返回一个元组对列表[(receiver, response),…]。
        """
        if not self.receivers or self.sender_receivers_cache.get(sender) is NO_RECEIVERS:
            return []

        return [
            (receiver, receiver(signal=self, sender=sender, **named))
            for receiver in self._live_receivers(sender)
        ]

    def send_robust(self, sender, **named):
        """
        发送信号从发送器到所有所连接的接收器中捕获错误。

        参数:
            sender
                信号的发送器。可以是任何Python对象(通常是通过一个connect注册的对象,如果你确实想要发生一些事情的话)。

            named
                被命名的参数将被传递给接收器。

        返回一个元组对列表[(receiver, response),…]。
        
        如果任何接收器引发错误(特别是任何Exception的子类),则返回该接收器的错误实例作为结果。
        """
        if not self.receivers or self.sender_receivers_cache.get(sender) is NO_RECEIVERS:
            return []

        # 用它可以接受的参数调用每个接收器。
        # 返回一个元组对列表[(receiver, response),…]。
        responses = []
        for receiver in self._live_receivers(sender):
            try:
                response = receiver(signal=self, sender=sender, **named)
            except Exception as err:
                logger.error(
                    'Error calling %s in Signal.send_robust() (%s)',
                    receiver.__qualname__,
                    err,
                    exc_info=err,
                )
                responses.append((receiver, err))
            else:
                responses.append((receiver, response))
        return responses

    def _clear_dead_receivers(self):
        # 注意,我们必须缓存weakref版本。
        if self._dead_receivers:
            self._dead_receivers = False
            self.receivers = [
                r for r in self.receivers
                if not(isinstance(r[1], weakref.ReferenceType) and r[1]() is None)
            ]

    def _live_receivers(self, sender):
        """
        得到解析的、存活的接收器的过滤序列

        这将检查弱引用并解析它们,然后只返回存活的接收器。
        """
        receivers = None
        if self.use_caching and not self._dead_receivers:
            receivers = self.sender_receivers_cache.get(sender)
            # 即使我们在调用_live_receivers()之前在.send()中检查了这种情况,但由于并发调用.send(),我们也可以在这里使用NO_RECEIVERS。
            if receivers is NO_RECEIVERS:
                return []
        if receivers is None:
            with self.lock:
                self._clear_dead_receivers()
                senderkey = _make_id(sender)
                receivers = []
                for (receiverkey, r_senderkey), receiver in self.receivers:
                    if r_senderkey == NONE_ID or r_senderkey == senderkey:
                        receivers.append(receiver)
                if self.use_caching:
                    if not receivers:
                        self.sender_receivers_cache[sender] = NO_RECEIVERS
                    else:
                        # 注意,我们必须缓存weakref版本。
                        self.sender_receivers_cache[sender] = receivers
        non_weak_receivers = []
        for receiver in receivers:
            if isinstance(receiver, weakref.ReferenceType):
                # 取消引用弱引用。
                receiver = receiver()
                if receiver is not None:
                    non_weak_receivers.append(receiver)
            else:
                non_weak_receivers.append(receiver)
        return non_weak_receivers

    def _remove_receiver(self, receiver=None):
        # 标记 self.receivers 名单上死亡的弱引用。如果是这样,我们将在持有 self.lock 时在 connect、disconnect 和 _live_receivers 中清理这些内容。注意,在这里做清理不是一个好主意,_remove_receiver()将作为垃圾收集的副作用被调用,因此调用可能发生在我们已经持有 self.lock 的时候。
        self._dead_receivers = True


def receiver(signal, **kwargs):
    """
    一个用来连接接收器和信号的装饰器。通过传入信号(或信号列表)和关键字参数来建立连接::

        @receiver(post_save, sender=MyModel)
        def signal_receiver(sender, **kwargs):
            ...

        @receiver([post_save, post_delete], sender=MyModel)
        def signals_receiver(sender, **kwargs):
            ...
    """
    def _decorator(func):
        if isinstance(signal, (list, tuple)):
            for s in signal:
                s.connect(func, **kwargs)
        else:
            signal.connect(func, **kwargs)
        return func
    return _decorator

从源码可以看出来,django信号机制使用观察者设计模式

使用django信号机制的完整过程如下:

  1. 创建信号、信号发送器和信号接收器。
  2. 将信号接收器连接到信号发送器,监听信号。
  3. 信号发送器发送信号。
  4. 信号发送器断开信号接收器。

1,创建信号、信号发送器和信号接收器

创建信号很简单,实例化django.dispatch.Signal类就行:

app/signals.py:
from django.dispatch import Signal

my_signal= Signal()
  • 为了规范项目结构,我们应该将信号放入 app/signals.py 文件中。

信号发送器就是一个信号的触发条件,它不过就是在应用的某处嵌入了信号的一段代码。比如创建好一个信号之后,我们想要在某个应用中的某个视图被执行的时候触发信号,那么这个视图就是信号发送器,因为在其中调用了信号发送方法。

合适的地方:
from app.signals import my_signal

# in a function, class, function_based view or class-based view
	...
	my_signal.send(sender=name_of_snippet, a_kwarg=value)
	...

信号接收器可以是任何 Python 函数或方法,只需要像信号发送器一样放到合适的地方就行。

合适的地方:
def my_callback(sender=name_of_snippet, **kwargs):
	...
  • 在实践中,信号接收器通常定义在与之相关的应用中的子模块signals.py文件中。

2,注册信号接收器

将信号接收器连接到信号发送器的操作叫注册接收器,注册完成后,信号接收器就开始监听这个信号。

一种方法是调用 Signal.connect() 方法注册一个函数:

app/signals.py:
import my_callback	# 接收器回调函数
my_signal.connect(receiver=my_callback)

另一种方法是对接收器函数使用 receiver() 装饰器

合适的地方或:
from app.signals import my_signal

@receiver(my_signal)
def my_callback(sender=name_of_snippet, **kwargs):
    ...

如果你正在使用receiver()装饰器,则可以在应用程序配置类ready()方法中中导入signals子模块,这将隐式地连接信号处理程序:

app/apps.py:
import my_callback
from django.apps import AppConfig

class MyAppConfig(AppConfig):
    ...

    def ready(self):
        # Implicitly connect a signal handlers decorated with @receiver.
		from app.signals import my_signal
        # Explicitly connect a signal handler.
        my_signal.connect(receiver=my_callback)

3,信号发送器发送信号

在 Django 中有两种发送信号的方法。

所有内置信号都使用Signal.send(sender, **kwargs)方法发送信号,不用提供sender参数。

还能使用Signal.send(sender, **kwargs)方法发送信号,需要提供sender参数(大多数情况下是一个类)。

合适的地方:
from app.signals import my_signal

# in a function, class, function_based view or class-based view
	...
	my_signal.send(sender=name_of_snippet, a_kwarg=value)
	...

相同之处:

  • 都返回一个元组对列表 [(receiver, response), … ]来表示被调用的接收器函数及其响应值的列表。

不同之处:

  • send()不捕获接收器引起的任何异常,它只是允许错误传播。因此,并非所有的接收器都会在出现错误时接收信号;send_robust() 捕获从 Python 的 Exception 类派生的所有错误,并确保所有接收器都收到信号通知。如果发生错误,将在引发错误的接收器的元组对中返回错误实例。

4,信号发送器断开信号接收器

要断开接收器与信号的连接,调用Signal.disconnect()方法。如果接收端已断开连接,则该方法返回True,否则返回False。

合适的地方:
from app.signals import my_signal

# in a function, class, function_based view or class-based view
	...
	my_signal.disconnect(receiver=my_callback)
	...

5,接收特定发送者的信号

有些信号被多次发送,但你只对接收这些信号的某个子集感兴趣。例如内置的信号 django.db.models.signals.pre_save 会在模型保存之前发送的信号,但默认情况下,它会将任何模型何时被保存都通过信号发送出去,而我们指向知道某个特定模型何时被保存,就应该在连接接收器时指定sender参数

合适的地方:
from app.signals import my_signal
from django.db.models.signals import pre_save
from django.dispatch import receiver
from myapp.models import MyModel


@receiver(pre_save, sender=MyModel)
def my_callback(sender=name_of_snippet, **kwargs):
    ...

6,防止重复信号

在某些情况下,连接接收器到信号的代码可能被执行多次,这可能会导致接收器函数被注册多次,一个信号事件就可能被调用同样多次。例如,应用程序配置类中的ready()方法在测试期间可能被多次执行。更普遍的是,在项目的任何地方导入定义信号的模块都会发生这种情况,因为信号注册的运行次数与导入操作执行的次数相同。

如果此行为会产生问题(例如在保存模型时使用信号可能会发送多封相同的电子邮件),则可以在注册接收器时使用dispatch_uid参数来标识接收器函数。这个标识符通常是一个字符串,尽管任何可散列对象都可以。最终的结果是,对于每个唯一的 dispatch_uid 值,接收器函数只与信号绑定一次:

app/signals.py:
import my_callback	# 接收器回调函数
my_signal.connect(receiver=my_callback, dispatch_uid="my_unique_identifier")

(二)内置的信号

使用内置信号时,只需在合适的位置将接收器注册到这个信号,然后在这个接受器中获取信号发送过来的参数并加以使用。

比如django.db.models.signals.post_save信号就会在调用模型的 save() 方法后发送。

app/signals.py:
from django.db.models.signals import post_save
from index.models import MyModel

# 设置内置信号post_save的回调函数signal_post_save
def signal_orders(sender, **kwargs):
    print("pre_save is coming")
    # 输出sender的数据
    print(sender)
    # 输出kwargs的所有数据
    print(kwargs)
    # instance代表当前修改或新增的模型对象
    instance = kwargs.get('instance')
    # created判断当前操作是否在模型中新增数据,若新增数据则为True
    created = kwargs.get('created')
    # using代表当前使用的数据库
    # 如果连接了多个数据库,则显示当前修改或新增的数据表所在数据库的名称
    using = kwargs.get('using')
    # update_fields控制需要更新的字段,默认为None
    update_fields = kwargs.get('update_fields')

# 将内置信号post_save与回调函数signal_post_save绑定
post_save.connect(signal_orders, sender=MyModel)

信号内容详见set of built-in signals

三,信号的使用示例

(一)订单的创建与取消对商品总数的影响

一般地,对于一个 网上商城或者进销存系统来说,商品库存数量随出货订单中商品的数量的变化而变化:

  • 用户完成订单支付后,原商品库存数量减去订单中商品数量就是现商品库存数量。
  • 用户取消订单或退货后,原商品库存数量加上订单中商品数量就是现商品库存数量。

总之,这种不同模型间的数据联动可以通过信号来完成。

1,创建模型:

# coding=gbk
from django.db import models


class ProductInfo(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=20)
    number = models.IntegerField()

    def __str__(self):
        return self.name

    class Meta:
        verbose_name = '商品信息'
        verbose_name_plural = '商品信息'


STATE = (
    (0, '取消'),
    (1, '创建'),
)


class OrderInfo(models.Model):
    id = models.AutoField(primary_key=True)
    product_id = models.IntegerField()
    product_quantity = models.IntegerField()
    state = models.IntegerField(choices=STATE, default=1)

    def __str__(self):
        return id

    class Meta:
        verbose_name = '订单信息'
        verbose_name_plural = '订单信息'

2,创建页面:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>订单</title>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"
            integrity="sha384-ka7Sk0Gln4gmtz2MlQnikT1wXgYsOg+OMhuP+IlRH9sENBO0LRn5q+8nbTov4+1p"
            crossorigin="anonymous"></script>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet"
          integrity="sha384-1BmE4kWBq78iYhFldvKuhfTAU6auU8tT94WrHftjDbrCEXSU1oBoqyl2QvZ6jIW3" crossorigin="anonymous">
    <script src="https://cdn.jsdelivr.net/npm/@popperjs/core@2.10.2/dist/umd/popper.min.js"
            integrity="sha384-7+zCNj/IqJ95wo16oMtfsKbZ9ccEh31eOz1HGyDuCQ6wgnyJNSYdrPa03rtR1zdB"
            crossorigin="anonymous"></script>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.min.js"
            integrity="sha384-QJHtvGhmr9XOIpI6YVutG+2QOK9T+ZnN4kzFN1RtK3zEFEIsxhlmWl5/YESvpZ13"
            crossorigin="anonymous"></script>
</head>


<body>
<div class="container">
    <form method="post">
        <div class="mb-3">
            <label for="exampleInputProductID" class="form-label">商品ID:</label>
            <input type="text" class="form-control" id="exampleInput" name="ProductID">
        </div>
        <div class="mb-3 {{ display }}">
            <label for="exampleInputProductQuantity" class="form-label">商品数量:</label>
            <input type="text" class="form-control" id="exampleInputProductQuantity" name="ProductQuantity">
        </div>
        <div class="mb-3 form-check">
            <label class="form-check-label" for="exampleCheck1">确认订单信息</label>
            <input type="checkbox" class="form-check-input" id="exampleCheck" name="Check">
        </div>
        <button type="submit" class="btn btn-success {{ display }}" id="create" name="create"
                formaction="{% url 'index:create' %}" value="create">创建订单
        </button>
        <button type="submit" class="btn btn-danger" id="cancel" name="cancel" formaction="{% url 'index:cancel' %}"
                value="cancel">取消订单
        </button>
    </form>
</div>
</body>
</html>

3,创建信号模块:

# coding=gbk
from django.db.models.signals import post_save
from index.models import OrderInfo, ProductInfo


# 设置内置信号post_save的回调函数signal_post_save
def signal_orders(sender, **kwargs):
    print("pre_save is coming")
    # 输出sender的数据
    print(sender)
    # instance代表当前修改或新增的模型对象
    instance = kwargs.get('instance')

    # 当订单状态等于0的时候,说明订单已经取消,商品数量加
    if instance.state == 0:
        p = ProductInfo.objects.get(id=instance.product_id)
        p.number += instance.product_quantity
        p.save()
    # 当订单状态等于1说明订单是新增,商品数量减
    elif instance.state == 1:
        p = ProductInfo.objects.get(id=instance.product_id)
        p.number -= instance.product_quantity
        p.save()


# 将内置信号post_save与回调函数signal_post_save绑定
post_save.connect(signal_orders, sender=OrderInfo)

4,创建路由:

根路由:
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include(('index.urls', 'index'), namespace='index'))
]

应用路由:
from django.urls import path
from index.views import order_info, create_order, cancel_order

urlpatterns = [
    # 定义路由
    path('create/', create_order, name='create'),
    path('cancel/', cancel_order, name='cancel'),
]

(二)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/98064.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!