Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
de7489d
add permissions tests (should fail)
BioWilko Dec 2, 2025
1fca027
add definitions json
BioWilko Dec 2, 2025
e62decd
use real definitions json
BioWilko Dec 3, 2025
61ac67b
initialise exchange with local admin account
BioWilko Dec 3, 2025
cba5b5c
purge queue during setup
BioWilko Dec 3, 2025
460cf8c
handle pre-existing exchanges better
BioWilko Dec 3, 2025
b217e0d
re-open channel after 404 on exchange declare
BioWilko Dec 3, 2025
3cfc44e
dump config to file properly
BioWilko Dec 3, 2025
c876bc3
fix fake varys config again
BioWilko Dec 3, 2025
f112c61
update perms
BioWilko Dec 3, 2025
a5bd3f2
fix perms typo in definition
BioWilko Dec 3, 2025
bbc7f18
fix missing queue in teardown purge
BioWilko Dec 3, 2025
00e380d
add more tests
BioWilko Dec 3, 2025
b723d27
fix logger checks
BioWilko Dec 3, 2025
3f28206
remove exception assertion
BioWilko Dec 3, 2025
ff7475e
properly raise exceptions after logging
BioWilko Dec 3, 2025
35afe12
try to raise exception outside of catch
BioWilko Dec 3, 2025
73b501d
specifically test for 403
BioWilko Dec 3, 2025
6b10da6
text for exception presence inside logfile instead
BioWilko Dec 3, 2025
44530e7
use caplog properly
BioWilko Dec 3, 2025
b5051df
print caplog text
BioWilko Dec 3, 2025
5ca3ffa
try using unittest logging func instead
BioWilko Dec 3, 2025
0c753bf
read the logfile manually like a caveman
BioWilko Dec 3, 2025
97dc8dc
add egg-info to gitirgnore
BioWilko Dec 3, 2025
6b33301
Bump version tag
BioWilko Dec 3, 2025
446c3be
recommended changes
BioWilko Dec 3, 2025
72d9d96
update package description
BioWilko Dec 3, 2025
03b6c8f
update logger closed check
BioWilko Dec 3, 2025
4298ef3
Add catch for stopped connection in producer
BioWilko Dec 5, 2025
766377b
no need to re-declare exchanges which should always exist at this point
BioWilko Dec 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
chmod a+rwx .rabbitmq/*
- name: Run RabbitMQ server
run: |
docker run -d -v ./.rabbitmq:/.rabbitmq -e RABBITMQ_CONFIG_FILE=/.rabbitmq/rabbitmq.conf -p 5671:5671 -p 5672:5672 rabbitmq
docker run -d -v ./.rabbitmq:/.rabbitmq -e RABBITMQ_CONFIG_FILE=/.rabbitmq/rabbitmq.conf -p 5671:5671 -p 5672:5672 rabbitmq:4.2.1
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -47,7 +47,7 @@ jobs:
run: |
# sometimes the scripts hang because the channels can't be closed,
# so run under a short timeout
timeout 60s pytest -v --cov-report=term-missing --cov=varys
timeout 120s pytest -v --cov-report=term-missing --cov=varys
- name: "Upload Logfile"
uses: actions/upload-artifact@v4
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ varys/__pycache__*
tests/__pycache__*
tests/test.log
varys.egg-info/*
varys_client.egg-info/*
.pytest_cache*
61 changes: 61 additions & 0 deletions .rabbitmq/definitions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"permissions": [
{
"configure": ".*",
"read": ".*",
"user": "guest",
"vhost": "/",
"write": ".*"
},
{
"configure": "test-exchange\\.\\w*|test-exchange-3\\.\\w*|test-exchange-3",
Comment thread
BioWilko marked this conversation as resolved.
"read": "test-exchange|test-exchange\\.\\w.*|test-exchange-3|test-exchange-3\\.\\w.*",
"user": "guest2",
"vhost": "/",
"write": "test-exchange|test-exchange\\.\\w.*|test-exchange-3|test-exchange-3\\.\\w.*"
Comment thread
BioWilko marked this conversation as resolved.
}
],
"bindings": [],
"queues": [],
"parameters": [],
"policies": [],
"rabbitmq_version": "4.2.1",
"rabbit_version": "4.2.1",
"exchanges": [],
"vhosts": [
{
"limits": [],
"metadata": {
"description": "Default virtual host",
"tags": [],
"default_queue_type": "classic"
},
"name": "/"
}
],
"users": [
{
"hashing_algorithm": "rabbit_password_hashing_sha256",
"limits": {},
"name": "guest",
"password_hash": "LzA8uEF2TYeMtmxczECa40VAIOdZmg08i6R+L4XEeg9b62Br",
"tags": [
"administrator"
]
},
{
"hashing_algorithm": "rabbit_password_hashing_sha256",
"limits": {},
"name": "guest2",
"password_hash": "nV8dlnyFAnteKsCYrrHUPmuNo7rmxLEgOkKXf8WHNW4Nt0GS",
"tags": []
}
],
"global_parameters": [
{
"name": "cluster_tags",
"value": []
}
],
"topic_permissions": []
}
2 changes: 2 additions & 0 deletions .rabbitmq/rabbitmq.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ listeners.ssl.default = 5671
ssl_options.cacertfile = /.rabbitmq/ca_certificate.pem
ssl_options.certfile = /.rabbitmq/server_localhost_certificate.pem
ssl_options.keyfile = /.rabbitmq/server_localhost_key.pem
definitions.import_backend = local_filesystem
definitions.local.path = /.rabbitmq/definitions.json
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[metadata]
name = varys-client
version = 1.1.1
version = 1.2.0
author = Sam Wilkinson
author_email = s.a.j.wilkinson@bham.ac.uk
description = A pika-based python RabbitMQ client for use in the CLIMB-tree project
description = A pika-based python RabbitMQ client for use in the CLIMB-GRE project
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/climb-tre/varys
Expand Down
116 changes: 115 additions & 1 deletion tests/test_varys.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ def quick_turnaround(self):
self.setUp()
# timeout seems to need to be at least 0.01s
received_messages = [
message.body.decode()[1:-1] for message in self.v.receive_batch('test_varys', queue_suffix='q', timeout=0.1)
message.body.decode()[1:-1]
for message in self.v.receive_batch(
"test_varys", queue_suffix="q", timeout=0.1
)
]

self.assertEqual(received_messages, sent_messages)
Expand Down Expand Up @@ -226,6 +229,117 @@ def test_quick_turnaround(self):
self.quick_turnaround()


class TestVarysPermissions(unittest.TestCase):

def setUp(self):
config = {
"version": "0.1",
"profiles": {
"test": {
"username": "guest2",
"password": "guest",
"amqp_url": "localhost",
"port": 5672,
"use_tls": False,
},
"admin": {
"username": "guest",
"password": "guest",
"amqp_url": "localhost",
"port": 5672,
"use_tls": False,
},
},
}

with open(TMP_FILENAME, "w") as f:
json.dump(config, f, ensure_ascii=False)

# Setup exchange
admin_varys = Varys("admin", LOG_FILENAME, config_path=TMP_FILENAME)
admin_varys.send("setup message", "test-exchange", queue_suffix="test_queue")
admin_varys.close()

credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost", credentials=credentials)
)
channel = connection.channel()

channel.queue_purge(queue="test-exchange.test_queue")
Comment thread
BioWilko marked this conversation as resolved.

with open(TMP_FILENAME, "w") as f:
json.dump(config, f, ensure_ascii=False)

self.v = Varys("test", LOG_FILENAME, config_path=TMP_FILENAME)

def tearDown(self):
# this seems to prevent some hanging
# or errors related to closing connections that haven't opened yet
# I presume because some operations are so fast
# that we try to close the connections before they've opened
# 0.01s seems to be sufficient; 0.1s is just a bit conservative
time.sleep(0.1)

self.v.close()
os.remove(TMP_FILENAME)
time.sleep(0.1)

credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost", credentials=credentials)
)
channel = connection.channel()

channel.queue_purge(queue="test-exchange.test_queue")
Comment thread
BioWilko marked this conversation as resolved.

connection.close()
time.sleep(0.5)

# check that all file handles were dropped
logger = logging.getLogger("test_varys")
self.assertEqual(len(logger.handlers), 0)
Comment thread
BioWilko marked this conversation as resolved.
Outdated

def test_not_permitted_declare_fail(self):
self.v.send(TEXT, "test-exchange-2", queue_suffix="test_queue")
time.sleep(0.5)
with open(LOG_FILENAME, "r") as f:
loglines = f.readlines()

self.assertTrue(
any(
"pika.exceptions.ChannelClosedByBroker: (403, " in message
for message in loglines
)
)

def test_send_receive_extant_queue(self):
self.v.send(TEXT, "test-exchange", queue_suffix="test_queue")
message = self.v.receive("test-exchange", queue_suffix="test_queue")
self.assertEqual(TEXT, json.loads(message.body))

logger = logging.getLogger("test-exchange")
self.assertEqual(len(logger.handlers), 1)

def test_send_nonexistant_queue(self):
Comment thread
BioWilko marked this conversation as resolved.
Comment thread
BioWilko marked this conversation as resolved.
self.v.send(TEXT, "test-exchange", queue_suffix="test_queue_2")
message = self.v.receive("test-exchange", queue_suffix="test_queue_2")
self.assertEqual(TEXT, json.loads(message.body))

logger = logging.getLogger("test-exchange")
self.assertEqual(len(logger.handlers), 1)

def test_send_nonexistant_exchange(self):
Comment thread
BioWilko marked this conversation as resolved.
Comment thread
BioWilko marked this conversation as resolved.
self.v.send(TEXT, "test-exchange-3", queue_suffix="test_queue")
message = self.v.receive("test-exchange-3", queue_suffix="test_queue")
self.assertEqual(TEXT, json.loads(message.body))

logger = logging.getLogger("test-exchange-3")
self.assertEqual(len(logger.handlers), 1)


class TestVarysConfig(unittest.TestCase):
def tearDown(self):
os.remove(TMP_FILENAME)
Expand Down
47 changes: 41 additions & 6 deletions varys/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import pika
from pika import exceptions as pika_exceptions
import time

from varys.utils import varys_message
Expand Down Expand Up @@ -71,12 +72,46 @@ def run(self):

self._connection = pika.BlockingConnection(self._parameters)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
)
self._channel.queue_declare(queue=self._queue, durable=True)
try:
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
passive=True,
)
except pika_exceptions.ChannelClosed as e:
if e.reply_code != 404:
raise

self._log.info(
f"Exchange {self._exchange} does not exist, creating it..."
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
)
try:
self._channel.queue_declare(
queue=self._queue, durable=True, passive=True
)
except pika_exceptions.ChannelClosed as e:
if e.reply_code != 404:
raise

self._log.info(
f"Queue {self._queue} does not exist, creating it..."
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
passive=True,
Comment thread
BioWilko marked this conversation as resolved.
Outdated
)
Comment thread
BioWilko marked this conversation as resolved.
Outdated
self._channel.queue_declare(queue=self._queue, durable=True)

self._channel.queue_bind(
queue=self._queue,
exchange=self._exchange,
Expand Down
4 changes: 2 additions & 2 deletions varys/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def send(
self,
message,
exchange,
queue_suffix=False,
queue_suffix="",
exchange_type="fanout",
max_attempts=1,
reconnect_wait=10,
Expand Down Expand Up @@ -109,7 +109,7 @@ def send(
def receive(
self,
exchange,
queue_suffix=False,
queue_suffix="",
timeout=None,
exchange_type="fanout",
prefetch_count=5,
Expand Down
48 changes: 42 additions & 6 deletions varys/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import pika
from pika import exceptions as pika_exceptions
import time
import json

Expand Down Expand Up @@ -85,12 +86,47 @@ def run(self):
try:
self._connection = pika.BlockingConnection(self._parameters)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
)
self._channel.queue_declare(queue=self._queue, durable=True)
try:
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
passive=True,
)
except pika_exceptions.ChannelClosed as e:
if e.reply_code != 404:
raise

self._log.info(
f"Exchange {self._exchange} does not exist, creating it..."
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
)

try:
self._channel.queue_declare(
queue=self._queue, durable=True, passive=True
)
except pika_exceptions.ChannelClosed as e:
if e.reply_code != 404:
raise

self._log.info(
f"Queue {self._queue} does not exist, creating it..."
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange,
exchange_type=self._exchange_type,
durable=True,
passive=True,
)
Comment thread
BioWilko marked this conversation as resolved.
Outdated
self._channel.queue_declare(queue=self._queue, durable=True)

self._channel.queue_bind(
queue=self._queue,
exchange=self._exchange,
Expand Down