Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 43 additions & 41 deletions lib/rabbitmq/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ def protocol_ports
reduce(Hash.new) { |acc, lnr| acc[lnr.protocol] = lnr.port; acc }
end

def list_nodes
decode_resource_collection(@connection.get("nodes"))
def list_nodes(query = {})
decode_resource_collection(@connection.get("nodes", query))
end

def node_info(name)
decode_resource(@connection.get("nodes/#{uri_encode(name)}"))
end

def list_extensions
decode_resource_collection(@connection.get("extensions"))
def list_extensions(query = {})
decode_resource_collection(@connection.get("extensions", query))
end

def list_definitions
Expand All @@ -81,8 +81,8 @@ def upload_definitions(defs)
response.success?
end

def list_connections
decode_resource_collection(@connection.get("connections"))
def list_connections(query = {})
decode_resource_collection(@connection.get("connections", query))
end

def connection_info(name)
Expand All @@ -93,22 +93,22 @@ def close_connection(name)
decode_resource(@connection.delete("connections/#{uri_encode(name)}"))
end

def list_channels
decode_resource_collection(@connection.get("channels"))
def list_channels(query = {})
decode_resource_collection(@connection.get("channels", query))
end

def channel_info(name)
decode_resource(@connection.get("channels/#{uri_encode(name)}"))
end

def list_exchanges(vhost = nil)
def list_exchanges(vhost = nil, query = {})
path = if vhost.nil?
"exchanges"
else
"exchanges/#{uri_encode(vhost)}"
end

decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def declare_exchange(vhost, name, attributes = {})
Expand Down Expand Up @@ -137,22 +137,22 @@ def exchange_info(vhost, name)
decode_resource(@connection.get("exchanges/#{uri_encode(vhost)}/#{uri_encode(name)}"))
end

def list_bindings_by_source(vhost, exchange)
decode_resource_collection(@connection.get("exchanges/#{uri_encode(vhost)}/#{uri_encode(exchange)}/bindings/source"))
def list_bindings_by_source(vhost, exchange, query = {})
decode_resource_collection(@connection.get("exchanges/#{uri_encode(vhost)}/#{uri_encode(exchange)}/bindings/source", query))
end

def list_bindings_by_destination(vhost, exchange)
decode_resource_collection(@connection.get("exchanges/#{uri_encode(vhost)}/#{uri_encode(exchange)}/bindings/destination"))
def list_bindings_by_destination(vhost, exchange, query = {})
decode_resource_collection(@connection.get("exchanges/#{uri_encode(vhost)}/#{uri_encode(exchange)}/bindings/destination", query))
end

def list_queues(vhost = nil)
def list_queues(vhost = nil, query = {})
path = if vhost.nil?
"queues"
else
"queues/#{uri_encode(vhost)}"
end

decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def queue_info(vhost, name)
Expand All @@ -171,8 +171,8 @@ def delete_queue(vhost, name)
decode_resource(@connection.delete("queues/#{uri_encode(vhost)}/#{uri_encode(name)}"))
end

def list_queue_bindings(vhost, queue)
decode_resource_collection(@connection.get("queues/#{uri_encode(vhost)}/#{uri_encode(queue)}/bindings"))
def list_queue_bindings(vhost, queue, query = {})
decode_resource_collection(@connection.get("queues/#{uri_encode(vhost)}/#{uri_encode(queue)}/bindings", query))
end

def purge_queue(vhost, name)
Expand All @@ -188,18 +188,18 @@ def get_messages(vhost, name, options)
decode_resource_collection(response)
end

def list_bindings(vhost = nil)
def list_bindings(vhost = nil, query = {})
path = if vhost.nil?
"bindings"
else
"bindings/#{uri_encode(vhost)}"
end

decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def list_bindings_between_queue_and_exchange(vhost, queue, exchange)
decode_resource_collection(@connection.get("bindings/#{uri_encode(vhost)}/e/#{uri_encode(exchange)}/q/#{uri_encode(queue)}"))
def list_bindings_between_queue_and_exchange(vhost, queue, exchange, query = {})
decode_resource_collection(@connection.get("bindings/#{uri_encode(vhost)}/e/#{uri_encode(exchange)}/q/#{uri_encode(queue)}", query))
end

def queue_binding_info(vhost, queue, exchange, properties_key)
Expand All @@ -219,8 +219,8 @@ def delete_queue_binding(vhost, queue, exchange, properties_key)
resp.success?
end

def list_bindings_between_exchanges(vhost, destination_exchange, source_exchange)
decode_resource_collection(@connection.get("bindings/#{uri_encode(vhost)}/e/#{uri_encode(source_exchange)}/e/#{uri_encode(destination_exchange)}"))
def list_bindings_between_exchanges(vhost, destination_exchange, source_exchange, query = {})
decode_resource_collection(@connection.get("bindings/#{uri_encode(vhost)}/e/#{uri_encode(source_exchange)}/e/#{uri_encode(destination_exchange)}", query))
end

def exchange_binding_info(vhost, destination_exchange, source_exchange, properties_key)
Expand All @@ -242,8 +242,8 @@ def delete_exchange_binding(vhost, destination_exchange, source_exchange, proper
end


def list_vhosts
decode_resource_collection(@connection.get("vhosts"))
def list_vhosts(query = {})
decode_resource_collection(@connection.get("vhosts", query))
end

def vhost_info(name)
Expand All @@ -263,14 +263,14 @@ def delete_vhost(name)



def list_permissions(vhost = nil)
def list_permissions(vhost = nil, query = {})
path = if vhost
"vhosts/#{uri_encode(vhost)}/permissions"
else
"permissions"
end

decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def list_permissions_of(vhost, user)
Expand All @@ -291,8 +291,8 @@ def clear_permissions_of(vhost, user)



def list_users
decode_resource_collection(@connection.get("users"))
def list_users(query = {})
decode_resource_collection(@connection.get("users", query))
end

def user_info(name)
Expand All @@ -314,8 +314,8 @@ def delete_user(name)
decode_resource(@connection.delete("users/#{uri_encode(name)}"))
end

def user_permissions(name)
decode_resource_collection(@connection.get("users/#{uri_encode(name)}/permissions"))
def user_permissions(name, query = {})
decode_resource_collection(@connection.get("users/#{uri_encode(name)}/permissions", query))
end

def whoami
Expand All @@ -324,23 +324,23 @@ def whoami



def list_policies(vhost = nil)
def list_policies(vhost = nil, query = {})
path = if vhost
"policies/#{uri_encode(vhost)}"
else
"policies"
end

decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def list_policies_of(vhost, name = nil)
def list_policies_of(vhost, name = nil, query = {})
path = if name
"policies/#{uri_encode(vhost)}/#{uri_encode(name)}"
else
"policies/#{uri_encode(vhost)}"
end
decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def update_policies_of(vhost, name, attributes)
Expand All @@ -358,22 +358,22 @@ def clear_policies_of(vhost, name)



def list_parameters(component = nil)
def list_parameters(component = nil, query = {})
path = if component
"parameters/#{uri_encode(component)}"
else
"parameters"
end
decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def list_parameters_of(component, vhost, name = nil)
def list_parameters_of(component, vhost, name = nil, query = {})
path = if name
"parameters/#{uri_encode(component)}/#{uri_encode(vhost)}/#{uri_encode(name)}"
else
"parameters/#{uri_encode(component)}/#{uri_encode(vhost)}"
end
decode_resource_collection(@connection.get(path))
decode_resource_collection(@connection.get(path, query))
end

def update_parameters_of(component, vhost, name, attributes)
Expand Down Expand Up @@ -436,7 +436,9 @@ def decode_resource(response)
end

def decode_resource_collection(response)
response.body.map { |i| Hashie::Mash.new(i) }
collection = response.body.is_a?(Array) ? response.body : response.body.fetch('items')

collection.map { |i| Hashie::Mash.new(i) }
end
end # Client
end # HTTP
Expand Down
27 changes: 22 additions & 5 deletions spec/integration/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,30 @@ def await_event_propagation
@channel.close
end

it "returns a list of all queues" do
q = @channel.queue("", durable: false)
context "when no params given" do
it "returns a list of all queues" do
q1 = @channel.queue("", durable: false)
q2 = @channel.queue("", durable: false)

xs = subject.list_queues
expect(xs.detect { |x| x.name == q.name }).to_not be_empty
xs = subject.list_queues
expect(xs.select { |x| [q1.name, q2.name].include?(x.name) }.count).to eq 2

subject.delete_queue("/", q.name)
subject.delete_queue("/", q1.name)
subject.delete_queue("/", q2.name)
end
end

context "when pagination params given" do
it "returns a paginated list of queues" do
q1 = @channel.queue("", durable: false)
q2 = @channel.queue("", durable: false)

xs = subject.list_queues(nil, page: 1, page_size: 1)
expect(xs.count).to eq 1

subject.delete_queue("/", q1.name)
subject.delete_queue("/", q2.name)
end
end
end

Expand Down