Distributed API#885
Conversation
The request to get agents in the cluster has been replaced with the distributed api request so it's no longer necessary
3e2263a to
b998caf
Compare
…inct API requests
|
Testing with 1K agents and 3 nodes. Found errors:
|
API calls wasnt correctly forwarded to nodes that have agents with ids higher than 500.
|
Yesterday's errors summary:
|
| old_limit = common.database_limit if 'limit' not in input_json['arguments'] else input_json['arguments']['limit'] | ||
| if old_offset > 0: | ||
| input_json['arguments']['offset'] = 0 | ||
| input_json['arguments']['limit'] = None |
There was a problem hiding this comment.
Why do you need to get all the information? You can forward the current limit and offset, then in the merge function, you will need to do it again, right?
| else: | ||
| if node_name == 'unknown' or node_name == '': | ||
| raise WazuhException(3017) | ||
| command = 'dapi_forward {}'.format(node_name) if node_name != master_name else 'dapi' |
There was a problem hiding this comment.
Duplicated code?. Is it in 'forward_list'?
|
|
||
| def get_solver_node(input_json, master_name): | ||
| """ | ||
| Gets the node that can solve a request. |
| import wazuh.ciscat as ciscat | ||
| import wazuh.active_response as active_response | ||
|
|
||
|
|
There was a problem hiding this comment.
Comment: Describe every type.
| response = self.get_worker_info(c_name)['handler'].execute(command, data) | ||
| yield c_name, response | ||
|
|
||
| class AbstractWorker(Handler): |
There was a problem hiding this comment.
It should be "AbstractClient". Then, use Worker for Worker nodes, and Client for InternalSocket.
| return self.my_connected | ||
|
|
||
|
|
||
| class WorkerHandler(AbstractWorker): |
There was a problem hiding this comment.
Change AbstractWorker
| time.sleep(2) | ||
|
|
||
|
|
||
| class InternalSocketWorker(communication.AbstractWorker): |
There was a problem hiding this comment.
It should be AbstractClient
| def run(self): | ||
| while not self.stopper.is_set() and self.running: | ||
| name, id, request = self.request_queue.get(block=True).split(' ', 2) | ||
| result = distribute_function(json.loads(request), from_master=True) |
There was a problem hiding this comment.
The queue will be blocked until this function ends. What happens if it never ends (long time)?.
- Add a timeout would require to change every framework function to add an "exit condition" in every part of the code that could be blocked (io wait, db, etc).
- The queue could create a thread for every request:
- After a time, continue with the next request (returning a timeout error)
- Log a warning
- Problem: The previous thread will not be killed. What will happen when it ends?.
- Set a limit of 20 "zombie threads" -> Warning and block queue? -> Next requests: "queue is blocked".
There was a problem hiding this comment.
The same API call (PUT/agents/restart):
- With threads: 1.0136449337 seconds
- Without threads: 0.634363174438 seconds
Is this really worth it? I don't think so.
…ndler Worker is a word reserved to refer to a type of cluster node. AbstractWorker and WorkerHandler classes represented a client in the cluster protocol, not a worker node.
Hello team,
This PR fixes #670 and adds framework support for wazuh/wazuh-api#126.
Best regards,
Marta