Skip to content

Commit 979b5ef

Browse files
authored
Properly handle permissions (#31)
* add permissions tests (should fail) * add definitions json * use real definitions json * initialise exchange with local admin account * purge queue during setup * handle pre-existing exchanges better * re-open channel after 404 on exchange declare * dump config to file properly * fix fake varys config again * update perms * fix perms typo in definition * fix missing queue in teardown purge * add more tests * fix logger checks * remove exception assertion * properly raise exceptions after logging * try to raise exception outside of catch * specifically test for 403 * text for exception presence inside logfile instead * use caplog properly * print caplog text * try using unittest logging func instead * read the logfile manually like a caveman * add egg-info to gitirgnore * Bump version tag * recommended changes * update package description * update logger closed check * Add catch for stopped connection in producer * no need to re-declare exchanges which should always exist at this point
1 parent 6a718fe commit 979b5ef

9 files changed

Lines changed: 267 additions & 20 deletions

File tree

.github/workflows/pytest.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
chmod a+rwx .rabbitmq/*
3737
- name: Run RabbitMQ server
3838
run: |
39-
docker run -d -v ./.rabbitmq:/.rabbitmq -e RABBITMQ_CONFIG_FILE=/.rabbitmq/rabbitmq.conf -p 5671:5671 -p 5672:5672 rabbitmq
39+
docker run -d -v ./.rabbitmq:/.rabbitmq -e RABBITMQ_CONFIG_FILE=/.rabbitmq/rabbitmq.conf -p 5671:5671 -p 5672:5672 rabbitmq:4.2.1
4040
- name: Install dependencies
4141
run: |
4242
python -m pip install --upgrade pip
@@ -47,7 +47,7 @@ jobs:
4747
run: |
4848
# sometimes the scripts hang because the channels can't be closed,
4949
# so run under a short timeout
50-
timeout 60s pytest -v --cov-report=term-missing --cov=varys
50+
timeout 120s pytest -v --cov-report=term-missing --cov=varys
5151
- name: "Upload Logfile"
5252
uses: actions/upload-artifact@v4
5353
with:

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ varys/__pycache__*
33
tests/__pycache__*
44
tests/test.log
55
varys.egg-info/*
6+
varys_client.egg-info/*
67
.pytest_cache*

.rabbitmq/definitions.json

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
{
2+
"permissions": [
3+
{
4+
"configure": ".*",
5+
"read": ".*",
6+
"user": "guest",
7+
"vhost": "/",
8+
"write": ".*"
9+
},
10+
{
11+
"configure": "test-exchange\\.\\w*|test-exchange-3\\.\\w*|test-exchange-3",
12+
"read": "test-exchange|test-exchange\\.\\w.*|test-exchange-3|test-exchange-3\\.\\w.*",
13+
"user": "guest2",
14+
"vhost": "/",
15+
"write": "test-exchange|test-exchange\\.\\w.*|test-exchange-3|test-exchange-3\\.\\w.*"
16+
}
17+
],
18+
"bindings": [],
19+
"queues": [],
20+
"parameters": [],
21+
"policies": [],
22+
"rabbitmq_version": "4.2.1",
23+
"rabbit_version": "4.2.1",
24+
"exchanges": [],
25+
"vhosts": [
26+
{
27+
"limits": [],
28+
"metadata": {
29+
"description": "Default virtual host",
30+
"tags": [],
31+
"default_queue_type": "classic"
32+
},
33+
"name": "/"
34+
}
35+
],
36+
"users": [
37+
{
38+
"hashing_algorithm": "rabbit_password_hashing_sha256",
39+
"limits": {},
40+
"name": "guest",
41+
"password_hash": "LzA8uEF2TYeMtmxczECa40VAIOdZmg08i6R+L4XEeg9b62Br",
42+
"tags": [
43+
"administrator"
44+
]
45+
},
46+
{
47+
"hashing_algorithm": "rabbit_password_hashing_sha256",
48+
"limits": {},
49+
"name": "guest2",
50+
"password_hash": "nV8dlnyFAnteKsCYrrHUPmuNo7rmxLEgOkKXf8WHNW4Nt0GS",
51+
"tags": []
52+
}
53+
],
54+
"global_parameters": [
55+
{
56+
"name": "cluster_tags",
57+
"value": []
58+
}
59+
],
60+
"topic_permissions": []
61+
}

.rabbitmq/rabbitmq.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@ listeners.ssl.default = 5671
22
ssl_options.cacertfile = /.rabbitmq/ca_certificate.pem
33
ssl_options.certfile = /.rabbitmq/server_localhost_certificate.pem
44
ssl_options.keyfile = /.rabbitmq/server_localhost_key.pem
5+
definitions.import_backend = local_filesystem
6+
definitions.local.path = /.rabbitmq/definitions.json

setup.cfg

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[metadata]
22
name = varys-client
3-
version = 1.1.1
3+
version = 1.2.0
44
author = Sam Wilkinson
55
author_email = s.a.j.wilkinson@bham.ac.uk
6-
description = A pika-based python RabbitMQ client for use in the CLIMB-tree project
6+
description = A pika-based python RabbitMQ client for use in the CLIMB-GRE project
77
long_description = file: README.md
88
long_description_content_type = text/markdown
99
url = https://github.com/climb-tre/varys

tests/test_varys.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ def quick_turnaround(self):
118118
self.setUp()
119119
# timeout seems to need to be at least 0.01s
120120
received_messages = [
121-
message.body.decode()[1:-1] for message in self.v.receive_batch('test_varys', queue_suffix='q', timeout=0.1)
121+
message.body.decode()[1:-1]
122+
for message in self.v.receive_batch(
123+
"test_varys", queue_suffix="q", timeout=0.1
124+
)
122125
]
123126

124127
self.assertEqual(received_messages, sent_messages)
@@ -226,6 +229,118 @@ def test_quick_turnaround(self):
226229
self.quick_turnaround()
227230

228231

232+
class TestVarysPermissions(unittest.TestCase):
233+
234+
def setUp(self):
235+
config = {
236+
"version": "0.1",
237+
"profiles": {
238+
"test": {
239+
"username": "guest2",
240+
"password": "guest",
241+
"amqp_url": "localhost",
242+
"port": 5672,
243+
"use_tls": False,
244+
},
245+
"admin": {
246+
"username": "guest",
247+
"password": "guest",
248+
"amqp_url": "localhost",
249+
"port": 5672,
250+
"use_tls": False,
251+
},
252+
},
253+
}
254+
255+
with open(TMP_FILENAME, "w") as f:
256+
json.dump(config, f, ensure_ascii=False)
257+
258+
# Setup exchange
259+
admin_varys = Varys("admin", LOG_FILENAME, config_path=TMP_FILENAME)
260+
admin_varys.send("setup message", "test-exchange", queue_suffix="test_queue")
261+
admin_varys.close()
262+
263+
credentials = pika.PlainCredentials("guest", "guest")
264+
265+
connection = pika.BlockingConnection(
266+
pika.ConnectionParameters("localhost", credentials=credentials)
267+
)
268+
channel = connection.channel()
269+
270+
channel.queue_purge(queue="test-exchange.test_queue")
271+
272+
with open(TMP_FILENAME, "w") as f:
273+
json.dump(config, f, ensure_ascii=False)
274+
275+
self.v = Varys("test", LOG_FILENAME, config_path=TMP_FILENAME)
276+
277+
def tearDown(self):
278+
# this seems to prevent some hanging
279+
# or errors related to closing connections that haven't opened yet
280+
# I presume because some operations are so fast
281+
# that we try to close the connections before they've opened
282+
# 0.01s seems to be sufficient; 0.1s is just a bit conservative
283+
time.sleep(0.1)
284+
285+
self.v.close()
286+
os.remove(TMP_FILENAME)
287+
time.sleep(0.1)
288+
289+
credentials = pika.PlainCredentials("guest", "guest")
290+
291+
connection = pika.BlockingConnection(
292+
pika.ConnectionParameters("localhost", credentials=credentials)
293+
)
294+
channel = connection.channel()
295+
296+
channel.queue_purge(queue="test-exchange.test_queue")
297+
298+
connection.close()
299+
time.sleep(0.5)
300+
301+
# check that all file handles were dropped for relevant loggers
302+
for logger_name in ["test-exchange", "test-exchange-2", "test-exchange-3"]:
303+
logger = logging.getLogger(logger_name)
304+
self.assertEqual(len(logger.handlers), 0)
305+
306+
def test_not_permitted_declare_fail(self):
307+
self.v.send(TEXT, "test-exchange-2", queue_suffix="test_queue")
308+
time.sleep(0.5)
309+
with open(LOG_FILENAME, "r") as f:
310+
loglines = f.readlines()
311+
312+
self.assertTrue(
313+
any(
314+
"pika.exceptions.ChannelClosedByBroker: (403, " in message
315+
for message in loglines
316+
)
317+
)
318+
319+
def test_send_receive_extant_queue(self):
320+
self.v.send(TEXT, "test-exchange", queue_suffix="test_queue")
321+
message = self.v.receive("test-exchange", queue_suffix="test_queue")
322+
self.assertEqual(TEXT, json.loads(message.body))
323+
324+
logger = logging.getLogger("test-exchange")
325+
self.assertEqual(len(logger.handlers), 1)
326+
327+
def test_send_nonexistant_queue(self):
328+
self.v.send(TEXT, "test-exchange", queue_suffix="test_queue_2")
329+
message = self.v.receive("test-exchange", queue_suffix="test_queue_2")
330+
self.assertEqual(TEXT, json.loads(message.body))
331+
332+
logger = logging.getLogger("test-exchange")
333+
self.assertEqual(len(logger.handlers), 1)
334+
335+
def test_send_nonexistant_exchange(self):
336+
self.v.send(TEXT, "test-exchange-3", queue_suffix="test_queue")
337+
message = self.v.receive("test-exchange-3", queue_suffix="test_queue")
338+
self.assertEqual(TEXT, json.loads(message.body))
339+
340+
logger = logging.getLogger("test-exchange-3")
341+
self.assertEqual(len(logger.handlers), 1)
342+
343+
229344
class TestVarysConfig(unittest.TestCase):
230345
def tearDown(self):
231346
os.remove(TMP_FILENAME)

varys/consumer.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
22
import pika
3+
from pika import exceptions as pika_exceptions
34
import time
45

56
from varys.utils import varys_message
@@ -71,12 +72,40 @@ def run(self):
7172

7273
self._connection = pika.BlockingConnection(self._parameters)
7374
self._channel = self._connection.channel()
74-
self._channel.exchange_declare(
75-
exchange=self._exchange,
76-
exchange_type=self._exchange_type,
77-
durable=True,
78-
)
79-
self._channel.queue_declare(queue=self._queue, durable=True)
75+
try:
76+
self._channel.exchange_declare(
77+
exchange=self._exchange,
78+
exchange_type=self._exchange_type,
79+
durable=True,
80+
passive=True,
81+
)
82+
except pika_exceptions.ChannelClosed as e:
83+
if e.reply_code != 404:
84+
raise
85+
86+
self._log.info(
87+
f"Exchange {self._exchange} does not exist, creating it..."
88+
)
89+
self._channel = self._connection.channel()
90+
self._channel.exchange_declare(
91+
exchange=self._exchange,
92+
exchange_type=self._exchange_type,
93+
durable=True,
94+
)
95+
try:
96+
self._channel.queue_declare(
97+
queue=self._queue, durable=True, passive=True
98+
)
99+
except pika_exceptions.ChannelClosed as e:
100+
if e.reply_code != 404:
101+
raise
102+
103+
self._log.info(
104+
f"Queue {self._queue} does not exist, creating it..."
105+
)
106+
self._channel = self._connection.channel()
107+
self._channel.queue_declare(queue=self._queue, durable=True)
108+
80109
self._channel.queue_bind(
81110
queue=self._queue,
82111
exchange=self._exchange,

varys/controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def send(
7575
self,
7676
message,
7777
exchange,
78-
queue_suffix=False,
78+
queue_suffix="",
7979
exchange_type="fanout",
8080
max_attempts=1,
8181
reconnect_wait=10,
@@ -109,7 +109,7 @@ def send(
109109
def receive(
110110
self,
111111
exchange,
112-
queue_suffix=False,
112+
queue_suffix="",
113113
timeout=None,
114114
exchange_type="fanout",
115115
prefetch_count=5,

varys/producer.py

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
22
import pika
3+
from pika import exceptions as pika_exceptions
34
import time
45
import json
56

@@ -46,8 +47,17 @@ def publish_message(self, message, max_attempts=3):
4647

4748
attempt = 0
4849
while attempt < max_attempts:
50+
attempt += 1
51+
52+
if self._connection is None or self._connection.is_closed:
53+
self._log.warning(
54+
"Connection is closed, cannot publish message, attempting to reconnect..."
55+
)
56+
if self._reconnect_wait > 0:
57+
time.sleep(self._reconnect_wait)
58+
continue
59+
4960
try:
50-
attempt += 1
5161
self._log.info(
5262
f"Sending message (attempt {attempt}): {json.dumps(message)}"
5363
)
@@ -85,12 +95,41 @@ def run(self):
8595
try:
8696
self._connection = pika.BlockingConnection(self._parameters)
8797
self._channel = self._connection.channel()
88-
self._channel.exchange_declare(
89-
exchange=self._exchange,
90-
exchange_type=self._exchange_type,
91-
durable=True,
92-
)
93-
self._channel.queue_declare(queue=self._queue, durable=True)
98+
try:
99+
self._channel.exchange_declare(
100+
exchange=self._exchange,
101+
exchange_type=self._exchange_type,
102+
durable=True,
103+
passive=True,
104+
)
105+
except pika_exceptions.ChannelClosed as e:
106+
if e.reply_code != 404:
107+
raise
108+
109+
self._log.info(
110+
f"Exchange {self._exchange} does not exist, creating it..."
111+
)
112+
self._channel = self._connection.channel()
113+
self._channel.exchange_declare(
114+
exchange=self._exchange,
115+
exchange_type=self._exchange_type,
116+
durable=True,
117+
)
118+
119+
try:
120+
self._channel.queue_declare(
121+
queue=self._queue, durable=True, passive=True
122+
)
123+
except pika_exceptions.ChannelClosed as e:
124+
if e.reply_code != 404:
125+
raise
126+
127+
self._log.info(
128+
f"Queue {self._queue} does not exist, creating it..."
129+
)
130+
self._channel = self._connection.channel()
131+
self._channel.queue_declare(queue=self._queue, durable=True)
132+
94133
self._channel.queue_bind(
95134
queue=self._queue,
96135
exchange=self._exchange,

0 commit comments

Comments
 (0)