Skip to content

Commit 460cf8c

Browse files
committed
handle pre-existing exchanges better
1 parent cba5b5c commit 460cf8c

2 files changed

Lines changed: 80 additions & 12 deletions

File tree

varys/consumer.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
22
import pika
3+
from pika import exceptions as pika_exceptions
34
import time
45

56
from varys.utils import varys_message
@@ -71,12 +72,44 @@ def run(self):
7172

7273
self._connection = pika.BlockingConnection(self._parameters)
7374
self._channel = self._connection.channel()
74-
self._channel.exchange_declare(
75-
exchange=self._exchange,
76-
exchange_type=self._exchange_type,
77-
durable=True,
78-
)
79-
self._channel.queue_declare(queue=self._queue, durable=True)
75+
try:
76+
self._channel.exchange_declare(
77+
exchange=self._exchange,
78+
exchange_type=self._exchange_type,
79+
durable=True,
80+
passive=True,
81+
)
82+
except pika_exceptions.ChannelClosed as e:
83+
if e.reply_code != 404:
84+
raise
85+
86+
self._log.info(
87+
f"Exchange {self._exchange} does not exist, creating it..."
88+
)
89+
self._channel = self._connection.channel()
90+
self._channel.exchange_declare(
91+
exchange=self._exchange,
92+
exchange_type=self._exchange_type,
93+
durable=True,
94+
)
95+
try:
96+
self._channel.queue_declare(queue=self._queue, durable=True)
97+
except pika_exceptions.ChannelClosed as e:
98+
if e.reply_code != 404:
99+
raise
100+
101+
self._log.info(
102+
f"Queue {self._queue} does not exist, creating it..."
103+
)
104+
self._channel = self._connection.channel()
105+
self._channel.exchange_declare(
106+
exchange=self._exchange,
107+
exchange_type=self._exchange_type,
108+
durable=True,
109+
passive=True,
110+
)
111+
self._channel.queue_declare(queue=self._queue, durable=True)
112+
80113
self._channel.queue_bind(
81114
queue=self._queue,
82115
exchange=self._exchange,

varys/producer.py

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
22
import pika
3+
from pika import exceptions as pika_exceptions
34
import time
45
import json
56

@@ -85,12 +86,46 @@ def run(self):
8586
try:
8687
self._connection = pika.BlockingConnection(self._parameters)
8788
self._channel = self._connection.channel()
88-
self._channel.exchange_declare(
89-
exchange=self._exchange,
90-
exchange_type=self._exchange_type,
91-
durable=True,
92-
)
93-
self._channel.queue_declare(queue=self._queue, durable=True)
89+
try:
90+
self._channel.exchange_declare(
91+
exchange=self._exchange,
92+
exchange_type=self._exchange_type,
93+
durable=True,
94+
passive=True,
95+
)
96+
except pika_exceptions.ChannelClosed as e:
97+
if e.reply_code != 404:
98+
raise
99+
100+
self._log.info(
101+
f"Exchange {self._exchange} does not exist, creating it..."
102+
)
103+
self._channel.exchange_declare(
104+
exchange=self._exchange,
105+
exchange_type=self._exchange_type,
106+
durable=True,
107+
)
108+
109+
try:
110+
self._channel.queue_declare(
111+
queue=self._queue, durable=True, passive=True
112+
)
113+
except pika_exceptions.ChannelClosed as e:
114+
if e.reply_code != 404:
115+
raise
116+
117+
self._log.info(
118+
f"Queue {self._queue} does not exist, creating it..."
119+
)
120+
self._channel = self._connection.channel()
121+
self._channel.exchange_declare(
122+
exchange=self._exchange,
123+
exchange_type=self._exchange_type,
124+
durable=True,
125+
passive=True,
126+
)
127+
self._channel.queue_declare(queue=self._queue, durable=True)
128+
94129
self._channel.queue_bind(
95130
queue=self._queue,
96131
exchange=self._exchange,

0 commit comments

Comments
 (0)