Python 提供了豐富的工具和函式函式庫,方便處理 CSV 和 JSON 等常見資料格式。利用內建的 csv 模組,可以有效讀寫 CSV 檔案,而 pandas 函式函式庫則提供了更進階的資料處理能力,例如使用 DataFrame 進行資料操作和分析。此外,結合 Faker 函式函式庫,可以輕鬆生成模擬資料,方便測試和開發。在實際應用中,可以利用 Apache Airflow 建立自動化的資料處理管線,將 CSV 資料轉換為 JSON 格式,並進行後續的資料分析和應用。

寫入 CSV 檔案

要寫入 CSV 檔案,需要先開啟檔案並指定寫入模式。然後,建立一個 csv.writer 物件來寫入資料。以下是範例程式碼:

import csv
from faker import Faker

fake = Faker()
header = ['name', 'age', 'street', 'city', 'state', 'zip', 'lng', 'lat']

with open('data.csv', 'w', newline='') as output:
    mywriter = csv.writer(output)
    mywriter.writerow(header)
    for r in range(1000):
        mywriter.writerow([fake.name(), fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(), fake.state(), fake.zipcode(), fake.longitude(), fake.latitude()])

這段程式碼會建立一個名為 data.csv 的檔案,並寫入 1000 行隨機資料。

讀取 CSV 檔案

讀取 CSV 檔案的步驟與寫入相似,但需要使用 csv.DictReader 來讀取檔案。以下是範例程式碼:

import csv

with open('data.csv', 'r') as f:
    myreader = csv.DictReader(f)
    headers = next(myreader)
    for row in myreader:
        print(row['name'], row['age'])

這段程式碼會讀取 data.csv 檔案,並印出每行的 nameage 欄位。

CSV 讀寫注意事項

  • 使用 with 關鍵字開啟檔案,確保檔案在使用完畢後會自動關閉。
  • 指設定檔案模式,例如 w 為寫入模式,r 為讀取模式。
  • 使用 csv.writercsv.DictReader 來寫入和讀取 CSV 檔案。
  • 使用 next() 函式來取得 CSV 檔案的標題欄位。

DictReader 的優點

  • 可以使用欄位名稱來存取資料,例如 row['name']
  • 提供了一種更直觀的方式來存取 CSV 檔案的資料。

讀寫 CSV 檔案的最佳實踐

  • 使用 with 關鍵字開啟檔案,確保檔案在使用完畢後會自動關閉。
  • 指設定檔案模式,例如 w 為寫入模式,r 為讀取模式。
  • 使用 csv.writercsv.DictReader 來寫入和讀取 CSV 檔案。
  • 使用 next() 函式來取得 CSV 檔案的標題欄位。

使用Python讀寫CSV檔案

CSV(Comma Separated Values)是一種常見的檔案格式,廣泛用於資料交換和儲存。Python提供了多種方法來讀寫CSV檔案,包括使用內建的csv模組和第三方函式庫pandas

使用內建的csv模組

內建的csv模組提供了一個簡單的方式來讀寫CSV檔案。以下是使用csv模組讀取CSV檔案的範例:

import csv

with open('data.csv', 'r') as csvfile:
    myreader = csv.DictReader(csvfile)
    for row in myreader:
        print(row['name'])

這段程式碼使用csv.DictReader類別來讀取CSV檔案,並將每一行資料轉換成一個字典。然後,程式碼使用for迴圈來迭代每一行資料,並印出name欄位的值。

使用pandas函式庫

pandas函式庫是一個強大的資料處理函式庫,提供了多種方法來讀寫CSV檔案。以下是使用pandas函式庫讀取CSV檔案的範例:

import pandas as pd

df = pd.read_csv('data.csv')
print(df.head(10))

這段程式碼使用pd.read_csv函式來讀取CSV檔案,並將資料轉換成一個DataFrame物件。然後,程式碼使用head方法來印出前10行資料。

比較csv模組和pandas函式庫

csv模組和pandas函式庫都可以用來讀寫CSV檔案,但是pandas函式庫提供了更多的功能和效率。以下是兩者的比較:

  • csv模組:
    • 較小的記憶體佔用
    • 較簡單的程式碼
    • 只能讀寫CSV檔案
  • pandas函式庫:
    • 較大的記憶體佔用
    • 較複雜的程式碼
    • 可以讀寫多種檔案格式,包括CSV、Excel、JSON等
    • 提供了更多的資料處理功能,包括資料篩選、分組、合併等

Python 中的檔案寫入和讀取

Python 是一種強大的語言,能夠輕鬆地處理各種檔案格式。下面,我們將探討如何在 Python 中建立和寫入 CSV 和 JSON 檔案。

建立 DataFrame

要建立一個 DataFrame,我們需要先建立一個字典(dictionary),這是一種儲存資料的資料結構,使用鍵值對(key-value pair)來儲存資料。字典的值可以是任何 Python 資料型別,例如陣列(array)。

import pandas as pd

# 建立字典
data = {'Name': ['Paul', 'Bob', 'Susan', 'Yolanda'],
        'Age': [23, 45, 18, 21]}

# 建立 DataFrame
df = pd.DataFrame(data)

寫入 CSV 檔案

要將 DataFrame 寫入 CSV 檔案,我們可以使用 to_csv() 方法,並傳入檔名。預設情況下,行名稱(row names)將被寫入檔案,如果不需要行名稱,可以傳入 index=False 引數。

# 寫入 CSV 檔案
df.to_csv('fromdf.csv', index=False)

寫入 JSON 檔案

JSON(JavaScript Object Notation)是一種常見的資料格式,尤其是在 API 呼叫中。Python 有一個標準函式庫叫做 json,可以用於處理 JSON 資料。

import json

# 建立 JSON 資料
json_data = {'Name': ['Paul', 'Bob', 'Susan', 'Yolanda'],
             'Age': [23, 45, 18, 21]}

# 寫入 JSON 檔案
with open('data.json', 'w') as f:
    json.dump(json_data, f)

讀取 JSON 檔案

要讀取 JSON 檔案,可以使用 json.load() 方法。

# 讀取 JSON 檔案
with open('data.json', 'r') as f:
    json_data = json.load(f)

使用Python建立和讀取JSON檔

JSON(JavaScript Object Notation)是一種輕量級的資料交換格式,廣泛用於網路應用程式中。Python提供了json模組來處理JSON資料。在本節中,我們將介紹如何使用Python建立和讀取JSON檔。

建立JSON檔

要建立JSON檔,首先需要匯入json模組和Faker類別,然後使用Faker生成假資料。以下是建立JSON檔的步驟:

  1. 匯入必要的模組:

import json from faker import Faker

2. 建立一個`Faker`物件:
   ```python
fake = Faker()
  1. 開啟一個檔案以寫入模式:

output = open(‘data.json’, ‘w’)

4. 建立一個字典來儲存資料:
   ```python
alldata = {}
alldata['records'] = []
  1. 使用Faker生成假資料,並將其加入字典中:

for x in range(1000): data = { “name”: fake.name(), “age”: fake.random_int(min=18, max=80, step=1), “street”: fake.street_address(), “city”: fake.city(), “state”: fake.state(), “zip”: fake.zipcode(), “lng”: float(fake.longitude()), “lat”: float(fake.latitude()) } alldata[‘records’].append(data)

6. 使用`json.dump()`方法將字典寫入檔案:
   ```python
json.dump(alldata, output)

讀取JSON檔

要讀取JSON檔,需要使用json.load()方法。以下是讀取JSON檔的步驟:

  1. 開啟檔案以讀取模式:

with open(“data.json”, “r”) as f:

2. 使用`json.load()`方法將JSON資料讀入字典
   ```python
data = json.load(f)

這樣就可以讀取JSON檔的內容了。

範例程式碼

以下是完整的範例程式碼:

import json
from faker import Faker

# 建立假資料
fake = Faker()
output = open('data.json', 'w')
alldata = {}
alldata['records'] = []

for x in range(1000):
    data = {
        "name": fake.name(),
        "age": fake.random_int(min=18, max=80, step=1),
        "street": fake.street_address(),
        "city": fake.city(),
        "state": fake.state(),
        "zip": fake.zipcode(),
        "lng": float(fake.longitude()),
        "lat": float(fake.latitude())
    }
    alldata['records'].append(data)

json.dump(alldata, output)

# 讀取JSON檔
with open("data.json", "r") as f:
    data = json.load(f)

print(data)

這個範例程式碼會建立一個包含1000條假資料的JSON檔,然後讀取這個檔案並印出其內容。

JSON 資料檢查與處理

在處理 JSON 資料時,瞭解如何正確地載入和轉換資料至 pandas DataFrame 是非常重要的。以下是玄貓對 JSON 資料檢查和處理的步驟。

載入 JSON 資料

首先,載入 JSON 資料可以使用 json 函式函式庫的 loads() 方法。這個方法可以將 JSON 字串轉換為 Python 物件。

import json

# 載入 JSON 資料
with open('data.json', 'r') as f:
    data = json.load(f)

檢查 JSON 資料

載入 JSON 資料後,可以使用 data['records'][0] 來檢查第一筆資料。或者,可以使用 data['records'][0]['name'] 來檢查第一筆資料的名稱欄位。

# 檢查第一筆資料
print(data['records'][0])

# 檢查第一筆資料的名稱欄位
print(data['records'][0]['name'])

轉換 JSON 資料至 pandas DataFrame

如果 JSON 資料是清晰且格式良好的,可以使用 pd.read_json() 方法直接載入至 pandas DataFrame。

import pandas as pd

# 載入 JSON 資料至 pandas DataFrame
df = pd.read_json('data.json')

但是,如果 JSON 資料是巢狀結構,例如 records 字典中的資料,就需要額外的步驟來載入。

# 載入 JSON 資料
with open('data.json', 'r') as f:
    data = json.load(f)

# 正規化 JSON 資料
df = pd.json_normalize(data, record_path='records')

寫入 JSON 資料

寫入 JSON 資料可以使用 to_json() 方法。可以指定 orient 引數來決定 JSON 資料的格式。

# 寫入 JSON 資料
df.to_json('output.json', orient='records')

圖表翻譯

以下是使用 Mermaid 圖表來視覺化 JSON 資料的載入和轉換過程:

  graph LR
    A[JSON 資料] -->|載入|> B[pandas DataFrame]
    B -->|轉換|> C[JSON 資料]
    C -->|寫入|> D[JSON 檔案]

圖表翻譯:

此圖表展示了 JSON 資料的載入、轉換和寫入過程。首先,JSON 資料被載入至 pandas DataFrame。然後,DataFrame 被轉換為 JSON 資料。最後,JSON 資料被寫入至 JSON 檔案。

使用 Apache Airflow 建立資料管線

Apache Airflow 是一個強大的工具,能夠幫助您建立和管理資料管線。它使用 Python 函式和 Bash 或其他操作員來建立任務,這些任務可以組合成一個有向非迴圈圖(DAG)。這意味著每個任務都會在完成後移動到下一個任務。

建立 CSV 到 JSON 資料管線

首先,讓我們建立一個簡單的 DAG,以瞭解 Airflow 的工作原理。這個 DAG 會列印預出一條訊息,然後讀取 CSV 檔案並列印預出所有名稱的清單。

步驟 1:匯入必要的函式庫

import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import pandas as pd

步驟 2:定義 CSV 到 JSON 函式

def CSVToJson():
    df = pd.read_csv('/home/paulcrickard/data.csv')
    df.to_json('/home/paulcrickard/data.json', orient='records')

步驟 3:建立 DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': dt.datetime(2023, 3, 20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'csv_to_json',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

bash_task = BashOperator(
    task_id='print_message',
    bash_command='echo "Hello World!"',
    dag=dag,
)

python_task = PythonOperator(
    task_id='csv_to_json',
    python_callable=CSVToJson,
    dag=dag,
)

bash_task >> python_task

執行 DAG

現在,您可以使用 Airflow 的 GUI 來執行和監控您的 DAG。只要點選「Trigger DAG」按鈕,就可以啟動 DAG 的執行。

結果

您的 DAG 會列印預出一條訊息,然後讀取 CSV 檔案並將其轉換為 JSON 檔案。JSON 檔案將包含所有名稱的清單,格式如下:

[
    {"name": "Henry Lee", "age": 42, "street": "57850 Zachary Camp", "city": "Lake Jonathon", "state": "Rhode Island", "zip": "93363", "lng": -161.561209, "lat": 72.086145},
    {"name": "Corey Combs DDS", "age": 43, "street": "60066 Ruiz Plaza Apt. 752", "city": "East Kaitlin", "state": "Alabama", "zip": "16297", "lng": 123.894456, "lat": -50.211986}
]

這就是使用 Apache Airflow 建立 CSV 到 JSON 資料管線的基本步驟。您可以根據自己的需求自訂 DAG 和任務,以建立更複雜的資料管線。

Apache Airflow資料管線建立

資料管線概述

Apache Airflow是一個強大的工作流程管理系統,能夠幫助您建立、管理和監控資料管線。在本節中,我們將建立一個簡單的資料管線,負責從CSV檔案中讀取資料,然後將其轉換為JSON檔案。

步驟1:讀取CSV檔案

首先,我們需要讀取CSV檔案並將其轉換為Pandas DataFrame。以下是示例程式碼:

import pandas as pd

# 讀取CSV檔案
df = pd.read_csv('data.csv')

# 列印每行資料的名稱
for i, r in df.iterrows():
    print(r['name'])

# 將DataFrame轉換為JSON檔案
df.to_json('data.json', orient='records')

這段程式碼讀取名為data.csv的CSV檔案,然後將其轉換為Pandas DataFrame。接著,它列印每行資料的名稱,最後將DataFrame轉換為JSON檔案並儲存為data.json

步驟2:建立Airflow DAG

接下來,我們需要建立Airflow DAG(Directed Acyclic Graph)以管理資料管線。以下是示例程式碼:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'paulcrickard',
    'start_date': datetime(2020, 3, 18),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'MyCSVDAG',
    default_args=default_args,
    schedule_interval='@daily',
)

這段程式碼建立了一個名為MyCSVDAG的DAG,具有預設引數,包括所有者、開始日期、重試次數和重試延遲時間。DAG的排程間隔設為每日執行一次。

步驟3:定義Airflow任務

最後,我們需要定義Airflow任務以執行資料管線。以下是示例程式碼:

def read_csv(**kwargs):
    # 讀取CSV檔案
    df = pd.read_csv('data.csv')

    # 列印每行資料的名稱
    for i, r in df.iterrows():
        print(r['name'])

    # 將DataFrame轉換為JSON檔案
    df.to_json('data.json', orient='records')

task = PythonOperator(
    task_id='read_csv',
    python_callable=read_csv,
    dag=dag,
)

這段程式碼定義了一個名為read_csv的任務,負責讀取CSV檔案,列印每行資料的名稱,然後將DataFrame轉換為JSON檔案。任務使用PythonOperator運運算元執行。

Airflow 中的 DAG 計劃與任務定義

在 Airflow 中,DAG(Directed Acyclic Graph)是一個有向非迴圈圖,代表了一系列任務之間的依賴關係。定義一個 DAG 需要指定其 start_dateschedule_interval,這兩個引數決定了 DAG 的執行時間。

例如,若要建立一個每 5 分鐘執行一次的 DAG,可以使用以下程式碼:

from datetime import datetime, timedelta
from airflow import DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 21),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'MyCSVDAG',
    default_args=default_args,
    schedule_interval=timedelta(minutes=5),
) as dag:
    # 任務定義
    pass

在這個例子中,start_date 設定為 2023 年 3 月 21 日,schedule_interval 設定為每 5 分鐘執行一次。

任務定義

Airflow 提供了多種預建的運運算元(Operator),可以用來定義任務。常用的運運算元包括 BashOperator、PythonOperator 和 PostgresOperator。

以下是使用 BashOperator 和 PythonOperator 定義任務的例子:

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def CSVToJson():
    # 讀取 CSV 檔案並轉換為 JSON
    pass

print_starting = BashOperator(
    task_id='starting',
    bash_command='echo "I am reading the CSV now....."'
)

CSVJson = PythonOperator(
    task_id='convertCSVtoJson',
    python_callable=CSVToJson
)

在這個例子中,print_starting 任務使用 BashOperator 執行一個 bash 命令,CSVJson 任務使用 PythonOperator 執行一個 Python 函式 CSVToJson

任務連線

定義了任務之後,需要將它們連線起來,以指定任務之間的依賴關係。可以使用 >> 運運算元來連線任務:

print_starting >> CSVJson

這個連線指定了 print_starting 任務必須在 CSVJson 任務之前執行。

讀寫檔案

在 Airflow 中,可以使用 FileSensor 運運算元來讀寫檔案。以下是使用 FileSensor 讀取 CSV 檔案的例子:

from airflow.sensors.filesystem import FileSensor

file_sensor = FileSensor(
    task_id='check_file',
    filepath='/path/to/data.csv'
)

這個例子中,file_sensor 任務會等待 /path/to/data.csv 檔案出現後才會繼續執行。

可以使用 PythonOperator 執行 Python 函式來讀寫檔案。以下是使用 PythonOperator 讀取 CSV 檔案的例子:

import pandas as pd

def read_csv():
    df = pd.read_csv('/path/to/data.csv')
    # 處理 DataFrame
    pass

read_csv_task = PythonOperator(
    task_id='read_csv',
    python_callable=read_csv
)

這個例子中,read_csv 函式會讀取 /path/to/data.csv 檔案並傳回一個 Pandas DataFrame。

Apache Airflow 中的 DAG 設計與執行

在 Apache Airflow 中,DAG(Directed Acyclic Graph)是用於定義工作流程的核心概念。以下是如何使用 Airflow 建立和執行 DAG 的步驟:

從技術架構視角來看,Python 提供了強大的工具和函式函式庫,例如內建的 csv 模組和 pandas 函式庫,簡化了 CSV 和 JSON 檔案的讀寫操作。csv 模組適用於輕量級的 CSV 檔案處理,而 pandas 則提供了更豐富的資料處理功能,尤其適用於大型資料集。然而,pandas 的記憶體佔用較大,需要根據實際情況權衡選擇。此外,DictReader 提供了更直觀的資料存取方式,提升了程式碼的可讀性。對於 JSON 檔案的處理,Python 的 json 模組提供了便捷的讀寫功能,並可透過 pd.json_normalize() 處理巢狀結構的 JSON 資料。Apache Airflow 則進一步提升了資料管線的管理效率,透過 DAG 定義工作流程,並利用 BashOperatorPythonOperator 等執行不同型別的任務,實作了自動化的資料處理流程。然而,Airflow 的學習曲線較陡峭,需要投入時間學習其核心概念和使用方法。對於重視長期穩定性的企業,建議逐步匯入 Airflow,並從簡單的資料管線開始,逐步提升其複雜度。隨著 Airflow 生態系統的日趨成熟,我們預見其應用門檻將大幅降低,成為資料工程領域不可或缺的工具。