refined-zebra-23040
09/12/2024, 10:13 AMancient-application-36103
09/12/2024, 2:50 PMrefined-zebra-23040
09/12/2024, 3:27 PM[2024-09-12T15:25:43.232+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0
[2024-09-12T152558.287+0000] {kubernetes_executor.py:239} INFO - Found 0 queued task instances
[2024-09-12T152604.651+0000] {dagrun.py:795} INFO - Marking run <DagRun ClickhouseQueryFlow @ 2024-09-12 152604.146014+0000 manual__2024-09-12T152604.146014+00:00, state:running, queued_at: 2024-09-12 152604.163666+00:00. externally triggered: True> successful
[2024-09-12T152604.653+0000] {dagrun.py:846} INFO - DagRun Finished: dag_id=ClickhouseQueryFlow, execution_date=2024-09-12 152604.146014+00:00, run_id=manual__2024-09-12T152604.146014+00:00, run_start_date=2024-09-12 152604.625967+00:00, run_end_date=2024-09-12 152604.652683+00:00, run_duration=0.026716, state=success, external_trigger=True, run_type=manual, data_interval_start=2024-09-12 152604.146014+00:00, data_interval_end=2024-09-12 152604.146014+00:00, dag_hash=99a202d434a72e4869ac27050f19a2b6
[2024-09-12T152613.248+0000] {kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 761, in _update_chunk_length
self.chunk_left = int(line, 16)
ValueError: invalid literal for int() with base 16: b''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 444, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 828, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 765, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
self.resource_version = self._run(
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 178, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for segment in resp.stream(amt=None, decode_content=False):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 624, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 857, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.9/contextlib.py", line 137, in exit
self.gen.throw(typ, value, traceback)
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 461, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
Process KubernetesJobWatcher-3038:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 761, in _update_chunk_length
self.chunk_left = int(line, 16)
ValueError: invalid literal for int() with base 16: b''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 444, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 828, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 765, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
self.resource_version = self._run(
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 178, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.9/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for segment in resp.stream(amt=None, decode_content=False):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 624, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 857, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.9/contextlib.py", line 137, in exit
self.gen.throw(typ, value, traceback)
File "/home/airflow/.local/lib/python3.9/site-packages/urllib3/response.py", line 461, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-09-12T152613.335+0000] {kubernetes_executor_utils.py:387} ERROR - Error while health checking kube watcher process for namespace airflow. Process died for unknown reasons
[2024-09-12T152613.351+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0ancient-application-36103
09/12/2024, 3:28 PMrefined-zebra-23040
09/12/2024, 3:31 PMancient-application-36103
09/12/2024, 3:32 PMrefined-zebra-23040
09/12/2024, 3:34 PMancient-application-36103
09/12/2024, 3:35 PMrefined-zebra-23040
09/12/2024, 3:41 PMfrom metaflow import FlowSpec, step, environment, kubernetes, conda
import os
class ClickhouseQueryFlow(FlowSpec):
@environment(vars={
"CLICKHOUSE_HOST": os.getenv("CLICKHOUSE_HOST"),
"CLICKHOUSE_USER": os.getenv("CLICKHOUSE_USER"),
"CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD"),
"CLICKHOUSE_PORT": os.getenv("CLICKHOUSE_PORT"),
})
# @kubernetes
# @pypi(packages={'clickhouse-connect': '0.7.19'})
@conda(packages={'clickhouse-connect': '0.7.19'})
@step
def start(self):
import clickhouse_connect
"""
This step connects to ClickHouse and executes a query.
"""
self.host = os.getenv('CLICKHOUSE_HOST')
self.user = os.getenv('CLICKHOUSE_USER')
self.password = os.getenv('CLICKHOUSE_PASSWORD')
self.port = os.getenv('CLICKHOUSE_PORT')
# Initialize ClickHouse connection
client = clickhouse_connect.get_client(
host=self.host,
username=self.user,
password=self.password,
port=self.port,
secure=True
)
# Example query: fetching some data
query = "SELECT * FROM system.tables;"
self.results = client.query(query).result_rows
print("Query executed successfully!")
print(self.results)
self.next(self.process_results)
# @kubernetes
@step
def process_results(self):
"""
Process the results fetched from ClickHouse.
"""
# For this example, we'll just print the results.
print("Processing results...")
for row in self.results:
print(row)
self.next(self.end)
@step
def end(self):
print("Flow completed!")
if __name__ == '__main__':
ClickhouseQueryFlow()
refined-zebra-23040
09/12/2024, 3:41 PMancient-application-36103
09/12/2024, 3:43 PMrefined-zebra-23040
09/12/2024, 4:36 PM