Python 提供了豐富的工具和函式函式庫,方便處理 CSV 和 JSON 等常見資料格式。利用內建的 csv
模組,可以有效讀寫 CSV 檔案,而 pandas
函式函式庫則提供了更進階的資料處理能力,例如使用 DataFrame 進行資料操作和分析。此外,結合 Faker
函式函式庫,可以輕鬆生成模擬資料,方便測試和開發。在實際應用中,可以利用 Apache Airflow 建立自動化的資料處理管線,將 CSV 資料轉換為 JSON 格式,並進行後續的資料分析和應用。
寫入 CSV 檔案
要寫入 CSV 檔案,需要先開啟檔案並指定寫入模式。然後,建立一個 csv.writer
物件來寫入資料。以下是範例程式碼:
import csv
from faker import Faker
fake = Faker()
header = ['name', 'age', 'street', 'city', 'state', 'zip', 'lng', 'lat']
with open('data.csv', 'w', newline='') as output:
mywriter = csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
mywriter.writerow([fake.name(), fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(), fake.state(), fake.zipcode(), fake.longitude(), fake.latitude()])
這段程式碼會建立一個名為 data.csv
的檔案,並寫入 1000 行隨機資料。
讀取 CSV 檔案
讀取 CSV 檔案的步驟與寫入相似,但需要使用 csv.DictReader
來讀取檔案。以下是範例程式碼:
import csv
with open('data.csv', 'r') as f:
myreader = csv.DictReader(f)
headers = next(myreader)
for row in myreader:
print(row['name'], row['age'])
這段程式碼會讀取 data.csv
檔案,並印出每行的 name
和 age
欄位。
CSV 讀寫注意事項
- 使用
with
關鍵字開啟檔案,確保檔案在使用完畢後會自動關閉。 - 指設定檔案模式,例如
w
為寫入模式,r
為讀取模式。 - 使用
csv.writer
和csv.DictReader
來寫入和讀取 CSV 檔案。 - 使用
next()
函式來取得 CSV 檔案的標題欄位。
DictReader 的優點
- 可以使用欄位名稱來存取資料,例如
row['name']
。 - 提供了一種更直觀的方式來存取 CSV 檔案的資料。
讀寫 CSV 檔案的最佳實踐
- 使用
with
關鍵字開啟檔案,確保檔案在使用完畢後會自動關閉。 - 指設定檔案模式,例如
w
為寫入模式,r
為讀取模式。 - 使用
csv.writer
和csv.DictReader
來寫入和讀取 CSV 檔案。 - 使用
next()
函式來取得 CSV 檔案的標題欄位。
使用Python讀寫CSV檔案
CSV(Comma Separated Values)是一種常見的檔案格式,廣泛用於資料交換和儲存。Python提供了多種方法來讀寫CSV檔案,包括使用內建的csv
模組和第三方函式庫pandas
。
使用內建的csv
模組
內建的csv
模組提供了一個簡單的方式來讀寫CSV檔案。以下是使用csv
模組讀取CSV檔案的範例:
import csv
with open('data.csv', 'r') as csvfile:
myreader = csv.DictReader(csvfile)
for row in myreader:
print(row['name'])
這段程式碼使用csv.DictReader
類別來讀取CSV檔案,並將每一行資料轉換成一個字典。然後,程式碼使用for
迴圈來迭代每一行資料,並印出name
欄位的值。
使用pandas
函式庫
pandas
函式庫是一個強大的資料處理函式庫,提供了多種方法來讀寫CSV檔案。以下是使用pandas
函式庫讀取CSV檔案的範例:
import pandas as pd
df = pd.read_csv('data.csv')
print(df.head(10))
這段程式碼使用pd.read_csv
函式來讀取CSV檔案,並將資料轉換成一個DataFrame
物件。然後,程式碼使用head
方法來印出前10行資料。
比較csv
模組和pandas
函式庫
csv
模組和pandas
函式庫都可以用來讀寫CSV檔案,但是pandas
函式庫提供了更多的功能和效率。以下是兩者的比較:
csv
模組:- 較小的記憶體佔用
- 較簡單的程式碼
- 只能讀寫CSV檔案
pandas
函式庫:- 較大的記憶體佔用
- 較複雜的程式碼
- 可以讀寫多種檔案格式,包括CSV、Excel、JSON等
- 提供了更多的資料處理功能,包括資料篩選、分組、合併等
Python 中的檔案寫入和讀取
Python 是一種強大的語言,能夠輕鬆地處理各種檔案格式。下面,我們將探討如何在 Python 中建立和寫入 CSV 和 JSON 檔案。
建立 DataFrame
要建立一個 DataFrame,我們需要先建立一個字典(dictionary),這是一種儲存資料的資料結構,使用鍵值對(key-value pair)來儲存資料。字典的值可以是任何 Python 資料型別,例如陣列(array)。
import pandas as pd
# 建立字典
data = {'Name': ['Paul', 'Bob', 'Susan', 'Yolanda'],
'Age': [23, 45, 18, 21]}
# 建立 DataFrame
df = pd.DataFrame(data)
寫入 CSV 檔案
要將 DataFrame 寫入 CSV 檔案,我們可以使用 to_csv()
方法,並傳入檔名。預設情況下,行名稱(row names)將被寫入檔案,如果不需要行名稱,可以傳入 index=False
引數。
# 寫入 CSV 檔案
df.to_csv('fromdf.csv', index=False)
寫入 JSON 檔案
JSON(JavaScript Object Notation)是一種常見的資料格式,尤其是在 API 呼叫中。Python 有一個標準函式庫叫做 json
,可以用於處理 JSON 資料。
import json
# 建立 JSON 資料
json_data = {'Name': ['Paul', 'Bob', 'Susan', 'Yolanda'],
'Age': [23, 45, 18, 21]}
# 寫入 JSON 檔案
with open('data.json', 'w') as f:
json.dump(json_data, f)
讀取 JSON 檔案
要讀取 JSON 檔案,可以使用 json.load()
方法。
# 讀取 JSON 檔案
with open('data.json', 'r') as f:
json_data = json.load(f)
使用Python建立和讀取JSON檔
JSON(JavaScript Object Notation)是一種輕量級的資料交換格式,廣泛用於網路應用程式中。Python提供了json
模組來處理JSON資料。在本節中,我們將介紹如何使用Python建立和讀取JSON檔。
建立JSON檔
要建立JSON檔,首先需要匯入json
模組和Faker
類別,然後使用Faker
生成假資料。以下是建立JSON檔的步驟:
- 匯入必要的模組:
import json from faker import Faker
2. 建立一個`Faker`物件:
```python
fake = Faker()
- 開啟一個檔案以寫入模式:
output = open(‘data.json’, ‘w’)
4. 建立一個字典來儲存資料:
```python
alldata = {}
alldata['records'] = []
- 使用
Faker
生成假資料,並將其加入字典中:
for x in range(1000): data = { “name”: fake.name(), “age”: fake.random_int(min=18, max=80, step=1), “street”: fake.street_address(), “city”: fake.city(), “state”: fake.state(), “zip”: fake.zipcode(), “lng”: float(fake.longitude()), “lat”: float(fake.latitude()) } alldata[‘records’].append(data)
6. 使用`json.dump()`方法將字典寫入檔案:
```python
json.dump(alldata, output)
讀取JSON檔
要讀取JSON檔,需要使用json.load()
方法。以下是讀取JSON檔的步驟:
- 開啟檔案以讀取模式:
with open(“data.json”, “r”) as f:
2. 使用`json.load()`方法將JSON資料讀入字典:
```python
data = json.load(f)
這樣就可以讀取JSON檔的內容了。
範例程式碼
以下是完整的範例程式碼:
import json
from faker import Faker
# 建立假資料
fake = Faker()
output = open('data.json', 'w')
alldata = {}
alldata['records'] = []
for x in range(1000):
data = {
"name": fake.name(),
"age": fake.random_int(min=18, max=80, step=1),
"street": fake.street_address(),
"city": fake.city(),
"state": fake.state(),
"zip": fake.zipcode(),
"lng": float(fake.longitude()),
"lat": float(fake.latitude())
}
alldata['records'].append(data)
json.dump(alldata, output)
# 讀取JSON檔
with open("data.json", "r") as f:
data = json.load(f)
print(data)
這個範例程式碼會建立一個包含1000條假資料的JSON檔,然後讀取這個檔案並印出其內容。
JSON 資料檢查與處理
在處理 JSON 資料時,瞭解如何正確地載入和轉換資料至 pandas DataFrame 是非常重要的。以下是玄貓對 JSON 資料檢查和處理的步驟。
載入 JSON 資料
首先,載入 JSON 資料可以使用 json
函式函式庫的 loads()
方法。這個方法可以將 JSON 字串轉換為 Python 物件。
import json
# 載入 JSON 資料
with open('data.json', 'r') as f:
data = json.load(f)
檢查 JSON 資料
載入 JSON 資料後,可以使用 data['records'][0]
來檢查第一筆資料。或者,可以使用 data['records'][0]['name']
來檢查第一筆資料的名稱欄位。
# 檢查第一筆資料
print(data['records'][0])
# 檢查第一筆資料的名稱欄位
print(data['records'][0]['name'])
轉換 JSON 資料至 pandas DataFrame
如果 JSON 資料是清晰且格式良好的,可以使用 pd.read_json()
方法直接載入至 pandas DataFrame。
import pandas as pd
# 載入 JSON 資料至 pandas DataFrame
df = pd.read_json('data.json')
但是,如果 JSON 資料是巢狀結構,例如 records 字典中的資料,就需要額外的步驟來載入。
# 載入 JSON 資料
with open('data.json', 'r') as f:
data = json.load(f)
# 正規化 JSON 資料
df = pd.json_normalize(data, record_path='records')
寫入 JSON 資料
寫入 JSON 資料可以使用 to_json()
方法。可以指定 orient
引數來決定 JSON 資料的格式。
# 寫入 JSON 資料
df.to_json('output.json', orient='records')
圖表翻譯
以下是使用 Mermaid 圖表來視覺化 JSON 資料的載入和轉換過程:
graph LR A[JSON 資料] -->|載入|> B[pandas DataFrame] B -->|轉換|> C[JSON 資料] C -->|寫入|> D[JSON 檔案]
圖表翻譯:
此圖表展示了 JSON 資料的載入、轉換和寫入過程。首先,JSON 資料被載入至 pandas DataFrame。然後,DataFrame 被轉換為 JSON 資料。最後,JSON 資料被寫入至 JSON 檔案。
使用 Apache Airflow 建立資料管線
Apache Airflow 是一個強大的工具,能夠幫助您建立和管理資料管線。它使用 Python 函式和 Bash 或其他操作員來建立任務,這些任務可以組合成一個有向非迴圈圖(DAG)。這意味著每個任務都會在完成後移動到下一個任務。
建立 CSV 到 JSON 資料管線
首先,讓我們建立一個簡單的 DAG,以瞭解 Airflow 的工作原理。這個 DAG 會列印預出一條訊息,然後讀取 CSV 檔案並列印預出所有名稱的清單。
步驟 1:匯入必要的函式庫
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import pandas as pd
步驟 2:定義 CSV 到 JSON 函式
def CSVToJson():
df = pd.read_csv('/home/paulcrickard/data.csv')
df.to_json('/home/paulcrickard/data.json', orient='records')
步驟 3:建立 DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': dt.datetime(2023, 3, 20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'csv_to_json',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
bash_task = BashOperator(
task_id='print_message',
bash_command='echo "Hello World!"',
dag=dag,
)
python_task = PythonOperator(
task_id='csv_to_json',
python_callable=CSVToJson,
dag=dag,
)
bash_task >> python_task
執行 DAG
現在,您可以使用 Airflow 的 GUI 來執行和監控您的 DAG。只要點選「Trigger DAG」按鈕,就可以啟動 DAG 的執行。
結果
您的 DAG 會列印預出一條訊息,然後讀取 CSV 檔案並將其轉換為 JSON 檔案。JSON 檔案將包含所有名稱的清單,格式如下:
[
{"name": "Henry Lee", "age": 42, "street": "57850 Zachary Camp", "city": "Lake Jonathon", "state": "Rhode Island", "zip": "93363", "lng": -161.561209, "lat": 72.086145},
{"name": "Corey Combs DDS", "age": 43, "street": "60066 Ruiz Plaza Apt. 752", "city": "East Kaitlin", "state": "Alabama", "zip": "16297", "lng": 123.894456, "lat": -50.211986}
]
這就是使用 Apache Airflow 建立 CSV 到 JSON 資料管線的基本步驟。您可以根據自己的需求自訂 DAG 和任務,以建立更複雜的資料管線。
Apache Airflow資料管線建立
資料管線概述
Apache Airflow是一個強大的工作流程管理系統,能夠幫助您建立、管理和監控資料管線。在本節中,我們將建立一個簡單的資料管線,負責從CSV檔案中讀取資料,然後將其轉換為JSON檔案。
步驟1:讀取CSV檔案
首先,我們需要讀取CSV檔案並將其轉換為Pandas DataFrame。以下是示例程式碼:
import pandas as pd
# 讀取CSV檔案
df = pd.read_csv('data.csv')
# 列印每行資料的名稱
for i, r in df.iterrows():
print(r['name'])
# 將DataFrame轉換為JSON檔案
df.to_json('data.json', orient='records')
這段程式碼讀取名為data.csv
的CSV檔案,然後將其轉換為Pandas DataFrame。接著,它列印每行資料的名稱,最後將DataFrame轉換為JSON檔案並儲存為data.json
。
步驟2:建立Airflow DAG
接下來,我們需要建立Airflow DAG(Directed Acyclic Graph)以管理資料管線。以下是示例程式碼:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'paulcrickard',
'start_date': datetime(2020, 3, 18),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'MyCSVDAG',
default_args=default_args,
schedule_interval='@daily',
)
這段程式碼建立了一個名為MyCSVDAG
的DAG,具有預設引數,包括所有者、開始日期、重試次數和重試延遲時間。DAG的排程間隔設為每日執行一次。
步驟3:定義Airflow任務
最後,我們需要定義Airflow任務以執行資料管線。以下是示例程式碼:
def read_csv(**kwargs):
# 讀取CSV檔案
df = pd.read_csv('data.csv')
# 列印每行資料的名稱
for i, r in df.iterrows():
print(r['name'])
# 將DataFrame轉換為JSON檔案
df.to_json('data.json', orient='records')
task = PythonOperator(
task_id='read_csv',
python_callable=read_csv,
dag=dag,
)
這段程式碼定義了一個名為read_csv
的任務,負責讀取CSV檔案,列印每行資料的名稱,然後將DataFrame轉換為JSON檔案。任務使用PythonOperator
運運算元執行。
Airflow 中的 DAG 計劃與任務定義
在 Airflow 中,DAG(Directed Acyclic Graph)是一個有向非迴圈圖,代表了一系列任務之間的依賴關係。定義一個 DAG 需要指定其 start_date
和 schedule_interval
,這兩個引數決定了 DAG 的執行時間。
例如,若要建立一個每 5 分鐘執行一次的 DAG,可以使用以下程式碼:
from datetime import datetime, timedelta
from airflow import DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 21),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'MyCSVDAG',
default_args=default_args,
schedule_interval=timedelta(minutes=5),
) as dag:
# 任務定義
pass
在這個例子中,start_date
設定為 2023 年 3 月 21 日,schedule_interval
設定為每 5 分鐘執行一次。
任務定義
Airflow 提供了多種預建的運運算元(Operator),可以用來定義任務。常用的運運算元包括 BashOperator、PythonOperator 和 PostgresOperator。
以下是使用 BashOperator 和 PythonOperator 定義任務的例子:
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def CSVToJson():
# 讀取 CSV 檔案並轉換為 JSON
pass
print_starting = BashOperator(
task_id='starting',
bash_command='echo "I am reading the CSV now....."'
)
CSVJson = PythonOperator(
task_id='convertCSVtoJson',
python_callable=CSVToJson
)
在這個例子中,print_starting
任務使用 BashOperator 執行一個 bash 命令,CSVJson
任務使用 PythonOperator 執行一個 Python 函式 CSVToJson
。
任務連線
定義了任務之後,需要將它們連線起來,以指定任務之間的依賴關係。可以使用 >>
運運算元來連線任務:
print_starting >> CSVJson
這個連線指定了 print_starting
任務必須在 CSVJson
任務之前執行。
讀寫檔案
在 Airflow 中,可以使用 FileSensor
運運算元來讀寫檔案。以下是使用 FileSensor
讀取 CSV 檔案的例子:
from airflow.sensors.filesystem import FileSensor
file_sensor = FileSensor(
task_id='check_file',
filepath='/path/to/data.csv'
)
這個例子中,file_sensor
任務會等待 /path/to/data.csv
檔案出現後才會繼續執行。
可以使用 PythonOperator
執行 Python 函式來讀寫檔案。以下是使用 PythonOperator
讀取 CSV 檔案的例子:
import pandas as pd
def read_csv():
df = pd.read_csv('/path/to/data.csv')
# 處理 DataFrame
pass
read_csv_task = PythonOperator(
task_id='read_csv',
python_callable=read_csv
)
這個例子中,read_csv
函式會讀取 /path/to/data.csv
檔案並傳回一個 Pandas DataFrame。
Apache Airflow 中的 DAG 設計與執行
在 Apache Airflow 中,DAG(Directed Acyclic Graph)是用於定義工作流程的核心概念。以下是如何使用 Airflow 建立和執行 DAG 的步驟:
從技術架構視角來看,Python 提供了強大的工具和函式函式庫,例如內建的 csv
模組和 pandas
函式庫,簡化了 CSV 和 JSON 檔案的讀寫操作。csv
模組適用於輕量級的 CSV 檔案處理,而 pandas
則提供了更豐富的資料處理功能,尤其適用於大型資料集。然而,pandas
的記憶體佔用較大,需要根據實際情況權衡選擇。此外,DictReader
提供了更直觀的資料存取方式,提升了程式碼的可讀性。對於 JSON 檔案的處理,Python 的 json
模組提供了便捷的讀寫功能,並可透過 pd.json_normalize()
處理巢狀結構的 JSON 資料。Apache Airflow 則進一步提升了資料管線的管理效率,透過 DAG 定義工作流程,並利用 BashOperator
和 PythonOperator
等執行不同型別的任務,實作了自動化的資料處理流程。然而,Airflow 的學習曲線較陡峭,需要投入時間學習其核心概念和使用方法。對於重視長期穩定性的企業,建議逐步匯入 Airflow,並從簡單的資料管線開始,逐步提升其複雜度。隨著 Airflow 生態系統的日趨成熟,我們預見其應用門檻將大幅降低,成為資料工程領域不可或缺的工具。