# 起因
rabbitmq 最近 consumer 报 403 拒绝某个账号访问某个 exchange,找 rabbitmq team 咨询,回复说最近把 mq 的 configure 权限收回了。也就是 client code 里不能再执行 create queue/exchange 的操作了。
# 异常
2021-03-12 02:30:39,268 pikaclient.py:153:INFO:Declaring exchange: xxx.xxx.exchange
2021-03-12 02:30:39,302 channel.py:1068:WARNING:Received remote Channel.Close (403): "ACCESS_REFUSED - access to exchange 'xxx.xxx.exchange' in vhost '/' refused for user 'xxxx'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncSSLTransport object at 0x7fbadd054390> params=<URLParameters host=xxx.xxx.com port=5671 virtual_host=/ ssl=True>>>
# 查询
查看 log,有一行 declaring exchange 的记录,查看 code,正好有 2 个 function 的调用,就是 queue_declare 和 exchange_declare,查看这两个方法(pika)源码,可以看到说明
def exchange_declare(self, | |
exchange, | |
exchange_type='direct', | |
passive=False, | |
durable=False, | |
auto_delete=False, | |
internal=False, | |
arguments=None): | |
"""This method creates an exchange if it does not already exist, and if | |
the exchange exists, verifies that it is of the correct and expected | |
class. | |
If passive set, the server will reply with Declare-Ok if the exchange | |
already exists with the same name, and raise an error if not and if the | |
exchange does not already exist, the server MUST raise a channel | |
exception with reply code 404 (not found). | |
:param str exchange: The exchange name consists of a non-empty sequence | |
of these characters: letters, digits, hyphen, underscore, period, | |
or colon | |
:param str exchange_type: The exchange type to use | |
:param bool passive: Perform a declare or just check to see if it | |
exists | |
:param bool durable: Survive a reboot of RabbitMQ | |
:param bool auto_delete: Remove when no more queues are bound to it | |
:param bool internal: Can only be published to by other exchanges | |
:param dict arguments: Custom key/value pair arguments for the exchange | |
:returns: Deferred that fires on the Exchange.DeclareOk response | |
:rtype: Deferred | |
:raises ValueError: | |
""" |
def queue_declare(self, | |
queue, | |
passive=False, | |
durable=False, | |
exclusive=False, | |
auto_delete=False, | |
arguments=None): | |
"""Declare queue, create if needed. This method creates or checks a | |
queue. When creating a new queue the client can specify various | |
properties that control the durability of the queue and its contents, | |
and the level of sharing for the queue. | |
Use an empty string as the queue name for the broker to auto-generate | |
one | |
:param str queue: The queue name; if empty string, the broker will | |
create a unique queue name | |
:param bool passive: Only check to see if the queue exists | |
:param bool durable: Survive reboots of the broker | |
:param bool exclusive: Only allow access by the current connection | |
:param bool auto_delete: Delete after consumer cancels or disconnects | |
:param dict arguments: Custom key/value arguments for the queue | |
:returns: Deferred that fires on the Queue.DeclareOk response | |
:rtype: Deferred | |
:raises ValueError: | |
""" |
# 解决
调用上面两个方法是,添加 passive 参数,置为 True,即:
self._channel.queue_declare(queue=queue_name, passive=True, callback=cb, durable=True) | |
... | |
self._channel.exchange_declare(exchange=exchange_name, passive=True, exchange_type=self.EXCHANGE_TYPE, durable=True, callback=cb) |