Hi i'm currently testing out deploying metaflow to...
# ask-metaflow
r
Hi i'm currently testing out deploying metaflow to airflow, but my converted airflow dags are never actually running. Locally the workflow does work. Any help would be very appreciated. Thanks
1
a
@refined-zebra-23040 what is the error that you see?
r
[2024-09-12T15:25:43.232+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0
[2024-09-12T152558.287+0000] {kubernetes_executor.py:239} INFO - Found 0 queued task instances [2024-09-12T152604.651+0000] {dagrun.py:795} INFO - Marking run <DagRun ClickhouseQueryFlow @ 2024-09-12 152604.146014+0000 manual__2024-09-12T152604.146014+00:00, state:running, queued_at: 2024-09-12 152604.163666+00:00. externally triggered: True> successful [2024-09-12T152604.653+0000] {dagrun.py:846} INFO - DagRun Finished: dag_id=ClickhouseQueryFlow, execution_date=2024-09-12 152604.146014+00:00, run_id=manual__2024-09-12T152604.146014+00:00, run_start_date=2024-09-12 152604.625967+00:00, run_end_date=2024-09-12 152604.652683+00:00, run_duration=0.026716, state=success, external_trigger=True, run_type=manual, data_interval_start=2024-09-12 152604.146014+00:00, data_interval_end=2024-09-12 152604.146014+00:00, dag_hash=99a202d434a72e4869ac27050f19a2b6 [2024-09-12T152613.248+0000] {kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 761, in _update_chunk_length self.chunk_left = int(line, 16) ValueError: invalid literal for int() with base 16: b'' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 444, in _error_catcher yield File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 828, in read_chunked self._update_chunk_length() File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 765, in _update_chunk_length raise InvalidChunkLength(self, line) urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run self.resource_version = self._run( File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs): File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 178, in stream for line in iter_resp_lines(resp): File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines for segment in resp.stream(amt=None, decode_content=False): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 624, in stream for line in self.read_chunked(amt, decode_content=decode_content): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 857, in read_chunked self._original_response.close() File "/usr/local/lib/python3.9/contextlib.py", line 137, in exit self.gen.throw(typ, value, traceback) File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 461, in _error_catcher raise ProtocolError("Connection broken: %r" % e, e) urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read)) Process KubernetesJobWatcher-3038: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 761, in _update_chunk_length self.chunk_left = int(line, 16) ValueError: invalid literal for int() with base 16: b'' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 444, in _error_catcher yield File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 828, in read_chunked self._update_chunk_length() File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 765, in _update_chunk_length raise InvalidChunkLength(self, line) urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run self.resource_version = self._run( File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs): File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 178, in stream for line in iter_resp_lines(resp): File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines for segment in resp.stream(amt=None, decode_content=False): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 624, in stream for line in self.read_chunked(amt, decode_content=decode_content): File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 857, in read_chunked self._original_response.close() File "/usr/local/lib/python3.9/contextlib.py", line 137, in exit self.gen.throw(typ, value, traceback) File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 461, in _error_catcher raise ProtocolError("Connection broken: %r" % e, e) urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read)) [2024-09-12T152613.335+0000] {kubernetes_executor_utils.py:387} ERROR - Error while health checking kube watcher process for namespace airflow. Process died for unknown reasons [2024-09-12T152613.351+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0
a
Does this happen for every flow or just flow? Can you try a very simple linear flow? Also, can you help us with what your airflow setup looks like and how is it connected to your Kubernetes cluster?
r
simpler flows e.g. parameter flow is working fine, funny thing is that it instantly is in state "success" without actually running any tasks.
a
The simpler flows have the same behavior? ^
r
no the simpler flows are working fine (linear flow, parameter flow etc.)
a
Gotcha - is there an example flow you are able to share with us that fails?
r
Copy code
from metaflow import FlowSpec, step, environment, kubernetes, conda
import os

class ClickhouseQueryFlow(FlowSpec):
    
    @environment(vars={
        "CLICKHOUSE_HOST": os.getenv("CLICKHOUSE_HOST"),
        "CLICKHOUSE_USER": os.getenv("CLICKHOUSE_USER"),
        "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD"),
        "CLICKHOUSE_PORT": os.getenv("CLICKHOUSE_PORT"),
    })
    
    # @kubernetes
    # @pypi(packages={'clickhouse-connect': '0.7.19'})
    @conda(packages={'clickhouse-connect': '0.7.19'})
    @step
    def start(self):
        import clickhouse_connect
        """
        This step connects to ClickHouse and executes a query.
        """
        self.host = os.getenv('CLICKHOUSE_HOST')
        self.user = os.getenv('CLICKHOUSE_USER')
        self.password = os.getenv('CLICKHOUSE_PASSWORD')
        self.port = os.getenv('CLICKHOUSE_PORT')

        # Initialize ClickHouse connection  
        client = clickhouse_connect.get_client(
            host=self.host,
            username=self.user,
            password=self.password,
            port=self.port,
            secure=True
        )

        # Example query: fetching some data
        query = "SELECT * FROM system.tables;"
        self.results = client.query(query).result_rows

        print("Query executed successfully!")
        print(self.results)
        
        self.next(self.process_results)
        
    # @kubernetes
    @step
    def process_results(self):
        """
        Process the results fetched from ClickHouse.
        """
        # For this example, we'll just print the results.
        print("Processing results...")
        for row in self.results:
            print(row)
        
        self.next(self.end)

    @step
    def end(self):
        print("Flow completed!")

if __name__ == '__main__':
    ClickhouseQueryFlow()
yes here, thanks for the help
a
Does this flow execute if you don’t have environment decorator?
r
i updated the simple flow and now no flow is running, i guess this has something to do with my env. variables, as this is the only thing that changed in between.