How to build an ETL pipeline with Python | Data pipeline | Export from SQL Server to PostgreSQL
Building an ETL (Extract, Transform, Load) pipeline to extract data from a source, transform it, and load it into a destination can be done using Python. Here is a brief overview of the process:
Building an ETL (Extract, Transform, Load) pipeline to export data from SQL Server to PostgreSQL using Python can be done using various libraries and tools. Here is a general overview of the process:
Extract data from SQL Server
- Use a Python library such as pyodbc or sqlalchemy to connect to SQL Server and retrieve the necessary data.
- Write SQL queries to select the data you need from the SQL Server database.
Transform data
- Use pandas, a Python data analysis library, to manipulate and clean the data as needed.
- Perform any necessary data type conversions, filtering, or data aggregation.
Load data into PostgreSQL
- Use a Python library such as psycopg2 or sqlalchemy to connect to PostgreSQL and insert the transformed data.
- Write SQL queries to create the necessary tables in the PostgreSQL database.
- Map the columns of the SQL Server data to the appropriate columns in the PostgreSQL tables.
Here is an example Python script that demonstrates how to export data from SQL Server to PostgreSQL:
import pandas as pd
import pyodbc
import psycopg2
# SQL Server connection details
server = '<sql_server_host>'
database = '<sql_server_database>'
username = '<sql_server_username>'
password = '<sql_server_password>'
# PostgreSQL connection details
pg_host = '<pg_host>'
pg_database = '<pg_database>'
pg_username = '<pg_username>'
pg_password = '<pg_password>'
# SQL query to select data from SQL Server
query = 'SELECT * FROM <table_name>'
# Connect to SQL Server and retrieve data
conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password)
data = pd.read_sql(query, conn)
# Transform data as needed
# ...
# Connect to PostgreSQL and insert transformed data
pg_conn = psycopg2.connect(host=pg_host, database=pg_database, user=pg_username, password=pg_password)
cur = pg_conn.cursor()
# Create the PostgreSQL table
create_table_query = '''
CREATE TABLE <table_name> (
<column_1_name> <column_1_data_type>,
<column_2_name> <column_2_data_type>,
...
);
'''
cur.execute(create_table_query)
# Map SQL Server columns to PostgreSQL columns
column_mapping = {
'<sql_server_column_1>': '<postgres_column_1>',
'<sql_server_column_2>': '<postgres_column_2>',
...
}
# Insert the data into the PostgreSQL table
for index, row in data.iterrows():
insert_query = f"INSERT INTO <table_name> ({','.join(column_mapping.values())}) VALUES ({','.join(['%s']*len(column_mapping))})"
cur.execute(insert_query, [row[column_mapping[column]] for column in column_mapping])
pg_conn.commit()
cur.close()
pg_conn.close()
Note that this is just a simple example to illustrate the basic process. The exact implementation may vary depending on the specific requirements and circumstances of your project.
import pandas as pd
import psycopg2
Connect to the CSV file and retrieve data
data = pd.read_csv('data.csv')
Transform the data as needed
data['total_sales'] = data['quantity'] * data['price']
Connect to the PostgreSQL database and insert the transformed data
conn = psycopg2.connect(database='mydatabase', user='myuser', password='mypassword', host='localhost', port='5432')
cur = conn.cursor()
Create the table in the database
cur.execute('CREATE TABLE sales (id SERIAL PRIMARY KEY, date DATE, product TEXT, quantity INTEGER, price NUMERIC, total_sales NUMERIC)')
Insert the data into the table
for index, row in data.iterrows():
cur.execute('INSERT INTO sales (date, product, quantity, price, total_sales) VALUES (%s, %s, %s, %s, %s)', (row['date'], row['product'], row['quantity'], row['price'], row['total_sales']))
Commit the transaction and close the connection
conn.commit()
cur.close()
conn.close()