millions-queen-95930
02/17/2024, 3:38 AMfrom metaflow import FlowSpec
from metaflow import step
from my_pkg.library.databases import initialize_database
from my_pkg.library.ssh import SshTunnel
from my_pkg.settings.databases import DatabaseConfiguration
class TunnelingPipelineExample(FlowSpec):
@step
def start(self):
self.next(self.query_tables)
@step
def query_tables(self):
db, cursor = initialize_database()
cursor.execute("USE some_database;")
cursor.execute("SELECT * FROM some_table;")
for result in cursor.fetchall():
print(f"RESULT: {result}")
self.next(self.end)
@step
def end(self):
print(f"Completed pipeline!")
if __name__ == "__main__":
with SshTunnel(DatabaseConfiguration()):
TunnelingPipelineExample()
## Working Example
from my_pkg.library.databases import initialize_database
from my_pkg.library.ssh import SshTunnel
from my_pkg.settings.databases import ApplicationDatabaseUsa
if __name__ == "__main__":
with SshTunnel(DatabaseConfiguration()):
db, cursor = initialize_database()
cursor.execute("USE some_database;")
cursor.execute("SELECT * FROM some_table;")
for result in cursor.fetchall():
print(f"RESULT: {result}")
The same code succeeds without Metaflow, so I assume Metaflow is somehow causing the context manager to get called twice given the error message says the tunnel is already in use.