Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions airflow/providers/apache/hive/hooks/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def _prepare_cli_cmd(self) -> list[Any]:

if self.use_beeline:
hive_bin = "beeline"
self._validate_beeline_parameters(conn)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to push this validation to the constructor? The Connection object is retrieved there as well as determining the use_beeline attr. Could fail faster before even executing the task. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, didn't notice maybe makes sense here since we have conn in __init__.
Not related to changes in this PR but should we think of moving self.get_connection(hive_cli_conn_id) call outside __init__ otherwise it will make a db call to get conn whenever dag parsing will happen?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The db call would only happen if an operator using this hook constructed the hook in its __init__. There is almost an instance of that in the HiveOperator, but it's to setup lazy loading the hook and make it an instance attr.

But yes, ideally the constructors of hooks, operators, sensor, etc. don't make any external calls

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, If you initialize connection in init you cannot make connection_id templated - even if you extend the operator.

While I used to think the same @josh-fell that early validation helps to surface such problems earlier, I think It has evolved and I am now of an opinion that constructors in our operators should just assign fields. Full stop.

And in the past (cannot find it easily) @uranusjr proposed that we get rid of explicit constructor and turn all our operators in 'dataclasses' https://peps.python.org/pep-0557/ - where you have only fields declared and all the init boilerplate is generated.

And I tend to agree that would be a good idea.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we need to avoid anything which might cause call to Airflow DB or any other resource.
There is situation when hook initialised in operator not lazily and even expected as one of argument:
SSHHook and SSHOperator

Also I agree that operator should only allow fields declared however even if I'am a fan of dataclasses there is couple of issues exists:

  1. __post_init__, there will be temptation to use it.
  2. A bit dumb inheritance, you should decorate child class with @dataclass otherwise it turned into the regular class with part of dataclass 🙄
  3. kwargs-only dataclasses introduced only in Python 3.10+

@josh-fell josh-fell Feb 15, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now of an opinion that constructors in our operators should just assign fields. Full stop.

Yes, same. I guess I figured since this was a hook, we're not moving the connection call out, and it's currently not going to get called in an operator, why not push up the validation? But I see it was misguided suggestion.

...get rid of explicit constructor and turn all our operators in 'dataclasses'

I would be over the moon if this was the implementation or attrs; the latter has been life changing. It's a shame you can't build an operator now, with any real added value in simplication, with attrs because of the metaclass logic going on. Alas, I would love this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alas, I would love this.

I share the sentiment :)

jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
if conf.get("core", "security") == "kerberos":
template = conn.extra_dejson.get("principal", "hive/_HOST@EXAMPLE.COM")
Expand All @@ -165,6 +166,22 @@ def _prepare_cli_cmd(self) -> list[Any]:

return [hive_bin] + cmd_extra + hive_params_list

def _validate_beeline_parameters(self, conn):
if ":" in conn.host or "/" in conn.host or ";" in conn.host:
raise Exception(
f"The host used in beeline command ({conn.host}) should not contain ':/;' characters)"
)
try:
int_port = int(conn.port)
if int_port <= 0 or int_port > 65535:
raise Exception(f"The port used in beeline command ({conn.port}) should be in range 0-65535)")
except (ValueError, TypeError) as e:
raise Exception(f"The port used in beeline command ({conn.port}) should be a valid integer: {e})")
if ";" in conn.schema:
raise Exception(
f"The schema used in beeline command ({conn.schema}) should not contain ';' character)"
)

@staticmethod
def _prepare_hiveconf(d: dict[Any, Any]) -> list[Any]:
"""
Expand Down
33 changes: 32 additions & 1 deletion tests/providers/apache/hive/hooks/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airflow.exceptions import AirflowException
from airflow.models.connection import Connection
from airflow.models.dag import DAG
from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook, HiveServer2Hook
from airflow.providers.apache.hive.hooks.hive import HiveCliHook, HiveMetastoreHook, HiveServer2Hook
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.utils import timezone
from airflow.utils.operator_helpers import AIRFLOW_VAR_NAME_FORMAT_MAPPING
Expand Down Expand Up @@ -641,6 +641,37 @@ def test_get_conn_with_password(self, mock_connect):
database="default",
)

@pytest.mark.parametrize(
"host, port, schema, message",
[
("localhost", "10000", "default", None),
("localhost:", "10000", "default", "The host used in beeline command"),
(";ocalhost", "10000", "default", "The host used in beeline command"),
(";ocalho/", "10000", "default", "The host used in beeline command"),
("localhost", "as", "default", "The port used in beeline command"),
("localhost", "0;", "default", "The port used in beeline command"),
("localhost", "10/", "default", "The port used in beeline command"),
("localhost", ":", "default", "The port used in beeline command"),
("localhost", "-1", "default", "The port used in beeline command"),
("localhost", "655536", "default", "The port used in beeline command"),
("localhost", "1234", "default;", "The schema used in beeline command"),
],
)
def test_get_conn_with_wrong_connection_parameters(self, host, port, schema, message):
connection = Connection(
conn_id="test",
conn_type="hive",
host=host,
port=port,
schema=schema,
)
hook = HiveCliHook()
if message:
with pytest.raises(Exception, match=message):
hook._validate_beeline_parameters(connection)
else:
hook._validate_beeline_parameters(connection)

def test_get_records(self):
hook = MockHiveServer2Hook()
query = f"SELECT * FROM {self.table}"
Expand Down