Python 的 Pandas 函式函式庫提供強大的資料處理能力,可以有效地進行資料清理、轉換和增強。搭配 Airflow 可以將這些資料處理步驟自動化,建構穩固的資料管道。本文的程式碼範例示範瞭如何使用 Pandas 處理日期時間欄位、利用外部地理編碼器 enrich 資料、合併不同來源的資料,以及清理地址資料等常見的資料處理任務。此外,文章也說明瞭如何將處理後的資料整合到 Elasticsearch 和 Kibana,實作資料視覺化,讓使用者更直觀地理解資料的價值。最後,文章還示範瞭如何使用 Airflow 建構 DAG,將資料處理和視覺化流程自動化,提高效率並降低錯誤率。
資料清理
資料清理是指去除不需要的資料或是修正錯誤的資料。例如,若我們有一個包含日期和時間的欄位,我們可能需要將它們分別轉換成日期和時間的欄位。
import pandas as pd
# 建立一個示例資料框
data = {
'trip_id': [1613335, 1613639, 1613708, 1613867, 1636714],
'started_at': ['5/21/2019 18:33', '5/21/2019 19:07', '5/21/2019 19:13', '5/21/2019 19:29', '5/24/2019 13:38']
}
df = pd.DataFrame(data)
# 將 started_at 欄位分別轉換成日期和時間的欄位
df['date'] = pd.to_datetime(df['started_at']).dt.date
df['time'] = pd.to_datetime(df['started_at']).dt.time
print(df)
資料轉換
資料轉換是指將資料從一個型別轉換成另一個型別。例如,若我們有一個包含日期和時間的欄位,我們可能需要將它們轉換成 timestamp 的型別。
# 將 date 和 time 欄位合併成一個 timestamp 欄位
df['timestamp'] = pd.to_datetime(df.apply(lambda row: f"{row['date']} {row['time']}", axis=1))
print(df)
資料增強
資料增強是指增加新的欄位或是修正現有的欄位以增強資料的品質。例如,若我們有一個包含 trip_id 的欄位,我們可能需要增加一個新的欄位來描述 trip 的型別。
# 增加一個新的欄位來描述 trip 的型別
df['trip_type'] = df['trip_id'].apply(lambda x: 'short' if x < 1614000 else 'long')
print(df)
處理日期時間欄位
在處理日期時間欄位時,瞭解欄位的資料型態非常重要。例如,started_at
欄位最初被認為是物件(object),但實際上它應該是日期時間(datetime)型態。當試圖使用日期篩選 started_at
欄位時,由於型態不符,會傳回所有列。
when = '2019-05-23'
x = df[(df['started_at'] > when)]
len(x)
為了正確處理日期時間欄位,可以使用 pd.to_datetime()
函式將欄位轉換為日期時間型態。這個函式可以指定欄位名稱和日期時間格式。
df['started_at'] = pd.to_datetime(df['started_at'], format='%m/%d/%Y %H:%M')
d.dtypes
轉換後,started_at
欄位就變成日期時間型態,可以正確地使用日期進行篩選。
when = '2019-05-23'
d[(d['started_at'] > when)]
資料增強
原始資料可能缺乏某些重要資訊,例如座標。為了能夠對這些資料進行地理空間分析或對映,需要將座標加入資料中。這可以透過外部地理編碼器(geocoder)來完成。
步驟一:選取子集資料
首先,選取資料中最常見的五個起始位置,並將它們放入一個新的 DataFrame 中。
new = pd.DataFrame(df['start_location_name'].value_counts().head(5))
步驟二:使用外部地理編碼器
使用外部地理編碼器(如 City of Albuquerque 的公用地理編碼器)為這些位置取得座標。
import requests
def get_coordinates(location):
# 使用外部地理編碼器 API 取得座標
url = f"https://example.com/geocode?location={location}"
response = requests.get(url)
data = response.json()
# 處理 API 回應取得座標
latitude = data['latitude']
longitude = data['longitude']
return latitude, longitude
# 將座標加入新 DataFrame 中
new['latitude'] = new['start_location_name'].apply(lambda x: get_coordinates(x)[0])
new['longitude'] = new['start_location_name'].apply(lambda x: get_coordinates(x)[1])
步驟三:合併資料
將增強的資料(包含座標)合併回原始資料集中,以便進行進一步的分析。
df = pd.merge(df, new, on='start_location_name')
這樣,資料就被成功增強,包含了必要的座標資訊,可以進行地理空間分析或對映等操作。
內容解密:
在這個過程中,我們首先了解了如何處理日期時間欄位的轉換,然後學習瞭如何使用外部地理編碼器為資料增加座標。這些步驟對於進行地理空間分析或對映是非常重要的。透過這些步驟,資料被成功增強,包含了必要的座標資訊,可以進行進一步的分析。
圖表翻譯:
flowchart TD A[原始資料] --> B[選取子集] B --> C[使用外部地理編碼器] C --> D[合併資料] D --> E[進行地理空間分析或對映]
此圖表展示了資料增強的流程,從選取子集資料、使用外部地理編碼器取得座標,到合併資料,最終進行地理空間分析或對映。
地址資料清理與處理
為了進行地理編碼(geocoding),我們需要從原始地址資料中提取街道地址。原始資料包含了多餘的資訊,如城市、州和郵遞區號。讓我們一步步清理和處理這些資料。
步驟1:分割地址資料
首先,我們使用字串分割功能,將位址列位按照逗號(,)分割,僅保留第一部分,即街道地址。
n = new['address'].str.split(pat=',', n=1, expand=True)
步驟2:處理交叉路口地址
觀察資料後,我們發現有些地址是交叉路口的形式,如「Central @ Tingley」。地理編碼器需要這些地址以「and」連線的形式出現,例如「Central and Tingley」。因此,我們需要替換「@」符號為「and」。
replaced = n[0].str.replace("@", "and")
步驟3:建立新的街道位址列位
現在,我們將清理和處理過的街道地址資料存入一個新的欄位中,命名為「street」。
new['street'] = n[0]
new['street'] = replaced
完成的資料表
經過上述步驟後,資料表現在包含了一個新的「street」欄位,該欄位只包含了街道地址的資訊,已經準備好進行地理編碼。
print(new)
內容解密:
str.split()
方法用於分割字串,按照指定的分隔符(本例中為逗號)將字串分割為子字串,並傳回一個列表。n=1
引數指定只分割一次,從左邊開始。expand=True
引數使得傳回結果為一個 DataFrame,每個分割部分作為一列。str.replace()
方法用於替換字串中的某些字元,本例中替換「@」為「and」。- 新的「street」欄位包含了清理和處理過的街道地址資料,為地理編碼做好了準備。
地理編碼與資料豐富化
在地理資訊系統中,地理編碼是一個至關重要的步驟,能夠將地址轉換為地理坐標。這個過程使得資料能夠在地圖上進行視覺化和分析。以下是如何對街道地址進行地理編碼,並將其與其他資料進行合併以豐富化資料的過程。
步驟1:載入必要的函式庫和資料
首先,需要載入必要的Python函式庫,包括pandas
,用於資料操作和分析。然後,載入原始的街道地址資料和另一個包含地理編碼資訊的CSV檔案。
import pandas as pd
# 載入原始街道地址資料
street_data = pd.read_csv('street_data.csv')
# 載入地理編碼資料
geocoded_data = pd.read_csv('geocodedstreet.csv')
步驟2:合併資料
接下來,需要根據街道地址將兩個DataFrame合併起來。這可以使用merge
函式實作,根據街道位址列位進行合併。
# 合併資料
merged_data = pd.merge(street_data, geocoded_data, on='street')
步驟3:檢查合併結果
合併後,需要檢查結果以確保資料是否正確合併。可以檢視合併後的DataFrame的結構和內容。
# 檢查合併結果
print(merged_data.head())
步驟4:資料豐富化
透過合併,現在的DataFrame包含了原始的街道地址以及對應的x和y坐標。這使得資料得到了豐富化,可以用於進一步的分析和視覺化。
# 範例:計算距離
def calculate_distance(x1, y1, x2, y2):
return ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
# 應用於資料
merged_data['distance'] = merged_data.apply(lambda row: calculate_distance(row['x'], row['y'], 0, 0), axis=1)
地理編碼資料合併
合併地理編碼資料
在進行地理編碼資料分析時,經常需要將不同來源的資料合併,以獲得更全面和準確的資訊。下面是使用 Pandas 進行地理編碼資料合併的示例。
步驟 1:載入地理編碼資料
import pandas as pd
# 載入地理編碼資料
geo = pd.read_csv('geocodedstreet.csv')
步驟 2:檢視地理編碼資料
print(geo)
輸出結果:
street x y
0 1898 Mountain Rd NW -106.667146 35.098104
1 Central and Tingley -106.679271 35.091205
2 2550 Central Ave NE -106.617420 35.080646
3 2901 Central Ave NE -106.612180 35.081120
4 330 Tijeras Ave NW -106.390355 35.078958
5 nothing street -106.000000 35.000000
步驟 3:合併地理編碼資料
# 合併地理編碼資料
joined = new.join(other=geo, how='left', lsuffix='_new', rsuffix='_geo')
步驟 4:檢視合併後的資料
print(joined[['street_new', 'street_geo', 'x', 'y']])
輸出結果:
street_new street_geo x y
0 1898 Mountain Rd NW 1898 Mountain Rd NW -106.667146 35.098104
1 Central and Tingley Central and Tingley -106.679271 35.091205
在上面的示例中,我們使用 join
方法將兩個 DataFrame 合併在一起。how
引數指定了合併的方式,可以是 left
、right
或 inner
。lsuffix
和 rsuffix
引數指定了左邊和右邊 DataFrame 中重覆的列的字尾。
圖表翻譯:
flowchart TD A[載入地理編碼資料] --> B[檢視地理編碼資料] B --> C[合併地理編碼資料] C --> D[檢視合併後的資料]
內容解密:
在這個示例中,我們使用 Pandas 的 join
方法將兩個 DataFrame 合併在一起。首先,我們載入地理編碼資料到 geo
DataFrame 中。然後,我們檢視 geo
DataFrame 的內容。接下來,我們合併 new
和 geo
DataFrame,使用 left
合併方式,並指定左邊和右邊 DataFrame 中重覆的列的字尾。最後,我們檢視合併後的資料。
資料清理與轉換
資料清理和轉換是資料科學中非常重要的步驟,能夠確保資料的品質和準確性。在本節中,我們將探討如何使用 Python 和 Airflow 來清理和轉換資料。
資料清理
資料清理是指從原始資料中移除不需要的資料、錯誤的資料或重複的資料。以下是資料清理的步驟:
- 讀取資料:首先,需要讀取原始資料,可以使用 pandas 的
read_csv
函式來讀取 CSV 檔案。 - 檢查資料:檢查資料是否有缺失值、錯誤值或重複值,可以使用 pandas 的
isnull
、duplicated
函式來檢查。 - 移除不需要的資料:移除不需要的資料,可以使用 pandas 的
drop
函式來移除指定的列或行。 - 轉換資料型別:轉換資料型別,可以使用 pandas 的
astype
函式來轉換指定的列的資料型別。
資料轉換
資料轉換是指將原始資料轉換成適合分析的格式。以下是資料轉換的步驟:
- 合併資料:合併多個資料表,可以使用 pandas 的
merge
函式來合併指定的欄位。 - 分割資料:分割資料,可以使用 pandas 的
split
函式來分割指定的欄位。 - 轉換資料格式:轉換資料格式,可以使用 pandas 的
to_csv
、to_json
函式來轉換指定的格式。
使用 Airflow 來清理和轉換資料
Airflow 是一個工作流程管理系統,可以用來自動化資料清理和轉換的流程。以下是使用 Airflow 來清理和轉換資料的步驟:
- 安裝 Airflow:安裝 Airflow,可以使用 pip 來安裝。
- 建立 DAG:建立 DAG,可以使用 Airflow 的
DAG
類別來建立。 - 定義任務:定義任務,可以使用 Airflow 的
Task
類別來定義。 - 執行 DAG:執行 DAG,可以使用 Airflow 的
execute
函式來執行。
範例
以下是使用 Airflow 來清理和轉換資料的範例:
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import pandas as pd
default_args = {
'owner': 'paulcrickard',
'start_date': dt.datetime(2020, 4, 13),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
def clean_scooter():
# 讀取資料
data = pd.read_csv('scooter_data.csv')
# 檢查資料
print(data.isnull().sum())
# 移除不需要的資料
data = data.drop(['region_id'], axis=1)
# 轉換資料型別
data['started_at'] = pd.to_datetime(data['started_at'])
# 寫入檔案
data.to_csv('clean_scooter_data.csv', index=False)
dag = DAG(
'clean_scooter_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
task = PythonOperator(
task_id='clean_scooter_task',
python_callable=clean_scooter,
dag=dag,
)
這個範例定義了一個 DAG,包含一個任務,該任務負責清理和轉換 scooter 資料。任務會讀取原始資料,檢查資料,移除不需要的資料,轉換資料型別,然後寫入檔案。
資料清理與過濾流程
步驟一:載入原始資料
首先,我們需要載入名為 scooter.csv
的原始資料集。這個資料集包含了分享單車的相關資訊,包括騎乘的開始時間、結束時間、騎乘距離等。
import pandas as pd
# 載入原始資料
df = pd.read_csv('scooter.csv')
步驟二:資料清理
接下來,我們需要對資料進行清理。首先,移除不需要的欄位 region_id
。然後,將 started_at
欄位轉換為 datetime 格式,以便於後續的日期篩選。
# 移除不需要的欄位
df.drop(columns=['region_id'], inplace=True)
# 將 started_at 欄位轉換為 datetime 格式
df['started_at'] = pd.to_datetime(df['started_at'], format='%m/%d/%Y %H:%M')
步驟三:儲存清理後的資料
清理後的資料儲存為新檔案 cleanscooter.csv
。
# 儲存清理後的資料
df.to_csv('cleanscooter.csv', index=False)
步驟四:定義資料過濾函式
定義一個函式 filterData
,負責載入清理後的資料,並根據指定的開始日期和結束日期進行過濾。
def filterData():
# 載入清理後的資料
df = pd.read_csv('cleanscooter.csv')
# 指定開始日期和結束日期
from_date = '2019-05-23'
to_date = '2019-06-03'
# 根據日期進行過濾
filtered_df = df[(df['started_at'] > from_date) & (df['started_at'] < to_date)]
# 儲存過濾後的資料
filtered_df.to_csv('may23-june3.csv', index=False)
步驟五:使用 Airflow 定義工作流程
使用 Airflow 定義一個工作流程 CleanData
,包含兩個任務:清理資料和過濾資料。
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 21),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'CleanData',
default_args=default_args,
schedule_interval=timedelta(minutes=5),
) as dag:
# 定義清理資料任務
clean_data = PythonOperator(
task_id='clean',
python_callable=cleanScooter
)
# 定義過濾資料任務
select_data = PythonOperator(
task_id='filter',
python_callable=filterData
)
這個工作流程每 5 分鐘執行一次,先清理資料,然後過濾資料。過濾後的資料會儲存為新檔案 may23-june3.csv
。
Apache Airflow 和 Kibana 整合應用
前言
在前面的章節中,我們學習瞭如何使用 Python、Apache Airflow 和 NiFi 建立資料管道。在這一章中,我們將使用這些技能建立一個連線到 SeeClickFix 的資料管道,下載城市的所有問題,並將其載入 Elasticsearch。
建立資料管道
這個資料管道將與前面的管道稍有不同,因為我們需要使用一個技巧來啟動它。我們將有兩個路徑到同一個資料函式庫,其中一個我們將在第一次執行後關閉,我們還將有一個處理器連線到自己以建立成功關係。
Mapping 資料型別
在建立管道之前,我們需要在 Elasticsearch 中對映一個欄位,以便我們可以從坐標中取得益處。為此,我們可以開啟 Kibana,在 Dev Tools 中輸入以下程式碼:
PUT scf
{
"mappings": {
"properties": {
"coords": {
"type": "geo_point"
}
}
}
}
這將建立一個名為 scf
的索引,並將 coords
欄位對映為 geo_point
型別。
建立管道
現在,我們可以開始建立管道了。首先,我們需要建立一個連線到 SeeClickFix 的資料源。然後,我們需要建立一個處理器來下載城市的所有問題。最後,我們需要建立一個載入器來將資料載入 Elasticsearch。
以下是管道的架構:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 21),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'see_click_fix',
default_args=default_args,
schedule_interval=timedelta(hours=8),
)
download_data = BashOperator(
task_id='download_data',
bash_command='curl -X GET "https://example.com/api/issues" -H "Authorization: Bearer YOUR_API_KEY" > issues.json',
dag=dag,
)
load_data = BashOperator(
task_id='load_data',
bash_command='curl -X POST "http://localhost:9200/scf/_doc" -H "Content-Type: application/json" -d @issues.json',
dag=dag,
)
end_task = BashOperator(
task_id='end_task',
bash_command='echo "Pipeline finished!"',
dag=dag,
)
download_data >> load_data >> end_task
這個管道將每 8 小時下載一次城市的所有問題,並將其載入 Elasticsearch。
建立 Kibana 儀錶板
現在,我們可以建立一個 Kibana 儀錶板來視覺化資料了。首先,我們需要建立一個索引模式來定義資料的結構。然後,我們可以建立視覺化元件來顯示資料。
以下是建立索引模式的步驟:
- 開啟 Kibana,前往 Management > Index Patterns。
- 點選 Create index pattern。
- 輸入索引模式名稱,例如
scf
。 - 選擇 Time Filter field name,例如
@timestamp
。 - 點選 Create。
現在,我們可以建立視覺化元件了。以下是建立視覺化元件的步驟:
- 開啟 Kibana,前往 Visualize。
- 點選 Create Visualization。
- 選擇視覺化型別,例如 Map。
- 選擇索引模式,例如
scf
。 - 組態視覺化元件的設定,例如 Geo Point。
- 點選 Save。
現在,我們可以建立儀錶板了。以下是建立儀錶板的步驟:
- 開啟 Kibana,前往 Dashboard。
- 點選 Create Dashboard。
- 選擇視覺化元件,例如 Map。
- 組態儀錶板的設定,例如 Title。
- 點選 Save。
現在,我們可以檢視儀錶板了。以下是檢視儀錶板的步驟:
- 開啟 Kibana,前往 Dashboard。
- 選擇儀錶板,例如 SeeClickFix。
- 檢視儀錶板的內容,例如 Map。
從資料清理、轉換到增強,本文深入探討了資料處理流程的各個環節,並以分享單車資料集為例,示範瞭如何運用 Python 和 Pandas 進行實務操作。從日期時間欄位的格式轉換、地址資料的清理與地理編碼,到最終整合外部資料豐富資料內容,每個步驟都體現了資料處理的精細與複雜性。技術堆疊的各層級協同運作中體現,資料處理並非單一技術的應用,而是整合多種工具和方法的系統工程。
透過多維比較分析,本文不僅展示瞭如何使用 pd.to_datetime()
函式處理日期時間資料,更點明瞭資料型態在資料篩選和分析中的重要性。此外,文章也詳細說明瞭利用外部地理編碼器取得座標資訊,以及如何將這些資訊整合回原始資料集,為地理空間分析奠定基礎。實務佈署中的常見陷阱與對策,例如地址資料中交叉路口格式的處理,也在文中得到了清晰的闡述和解決方案。
展望未來,資料處理技術將持續朝向自動化和智慧化發展。隨著機器學習和人工智慧技術的進步,預期將出現更強大的資料清理和轉換工具,能夠自動識別和修正資料錯誤,進一步提升資料處理效率。隨著生態系統日趨完善,我們預見此技術的應用門檻將大幅降低。對於重視資料品質的企業而言,掌握這些新興技術將是提升商業決策效率的關鍵。