隨著資料量的爆炸性增長,有效地處理和分析大資料變得至關重要。本文以廣泛使用的 MovieLens 資料集為例,探討如何運用 Python 和 MongoDB 處理和分析大資料。首先,我們會使用 Python 對原始資料進行前處理,包含讀取、清理和格式轉換,將其轉換為易於處理的 JSON 格式。接著,示範如何將處理後的資料匯入 MongoDB 資料函式庫,並利用 MongoDB 的聚合管道功能進行高效的資料查詢和分析,例如計算電影平均評分、觀眾平均年齡等統計資料。此外,文章也提供直接將原始 JSON 資料儲存至 MongoDB 的方法,以及如何使用 Twitter API 擷取資料的程式碼範例,展現更全面的資料處理流程。
大資料處理與分析實務:以MovieLens資料集為例
大資料是21世紀的熱門話題。本文將以GroupLens提供的MovieLens資料集為例,展示如何處理和分析大資料。MovieLens資料集包含了約一百萬條評分記錄,涵蓋了六千名使用者對四千部電影的評價。
資料前處理
首先,我們需要對資料進行前處理,包括讀取、清理和格式化。以下程式碼展示瞭如何讀取MovieLens資料集並將其轉換為JSON格式:
import json, csv
def read_dat(h, f):
return csv.DictReader((line.replace('::', ':') for line in open(f)), delimiter=':', fieldnames=h, quoting=csv.QUOTE_NONE)
def gen_dict(d):
for row in d:
yield dict(row)
def dump_json(f, l, d):
f = open(f, 'w')
f.write('[')
for i, row in enumerate(d):
j = json.dumps(row)
f.write(j)
if i < l - 1:
f.write(',')
else:
f.write(']')
f.close()
def read_json(f):
with open(f) as f:
return json.load(f)
def display(n, f):
for i, row in enumerate(f):
if i < n:
print(row)
print()
if __name__ == "__main__":
u_dat = 'data/ml-1m/users.dat'
m_dat = 'data/ml-1m/movies.dat'
r_dat = 'data/ml-1m/ratings.dat'
unames = ['user_id', 'gender', 'age', 'occupation', 'zip']
mnames = ['movie_id', 'title', 'genres']
rnames = ['user_id', 'movie_id', 'rating', 'timestamp']
# 讀取資料集並計算大小
users = read_dat(unames, u_dat)
ul = len(list(gen_dict(users)))
movies = read_dat(mnames, m_dat)
ml = len(list(gen_dict(movies)))
ratings = read_dat(rnames, r_dat)
rl = len(list(gen_dict(ratings)))
print('資料集大小:')
print('使用者', ul)
print('電影', ml)
print('評分', rl)
# 將資料集轉換為JSON格式
users = read_dat(unames, u_dat)
users = gen_dict(users)
movies = read_dat(mnames, m_dat)
movies = gen_dict(movies)
ratings = read_dat(rnames, r_dat)
ratings = gen_dict(ratings)
uf = 'data/users.json'
dump_json(uf, ul, users)
mf = 'data/movies.json'
dump_json(mf, ml, movies)
rf = 'data/ratings.json'
dump_json(rf, rl, ratings)
# 驗證資料
u = read_json(uf)
m = read_json(mf)
r = read_json(rf)
n = 1
display(n, u)
display(n, m)
display(n, r)
程式碼解密:
read_dat函式:讀取MovieLens資料集中的.dat檔案,將雙冒號(::)替換為單冒號(:),並使用csv.DictReader讀取資料。gen_dict函式:將OrderedDict列表轉換為普通的字典列表,以便於後續處理。dump_json函式:將資料寫入JSON檔案中。read_json函式:讀取JSON檔案中的資料。display函式:顯示前n條資料記錄,用於驗證資料是否正確。
資料清理與轉換
在取得JSON格式的資料後,我們需要對資料進行進一步的清理和轉換。以下程式碼展示瞭如何清理電影資料集:
import json, numpy as np
def read_json(f):
with open(f) as f:
return json.load(f)
def dump_json(f, d):
with open(f, 'w') as fout:
json.dump(d, fout)
def display(n, d):
[print(row) for i, row in enumerate(d) if i < n]
def get_indx(k, d):
return [row[k] for row in d if 'null' in row]
def get_data(k, l, d):
return [row for i, row in enumerate(d) if row[k] in l]
def get_unique(key, d):
s = set()
for row in d:
for k, v in row.items():
if k in key:
s.add(v)
return np.sort(list(s))
if __name__ == "__main__":
mf = 'data/movies.json'
m = read_json(mf)
# 清理電影資料
indx = get_indx('movie_id', m)
for row in m:
if row['movie_id'] in indx:
# 處理含有null值的記錄
row['title'] = row['title'] + ':' + row['genres']
row['genres'] = row['null'][0]
del row['null']
title = row['title'].split(" ")
year = title.pop()
year = ''.join(c for c in year if c not in '()')
row['title'] = ' '.join(title)
row['year'] = year
# 顯示清理後的資料
n = 2
display(n, m)
# 取得年份的唯一值
s = get_unique('year', m)
print('\n', s, '\n')
# 清理特定記錄
rec = get_data('year', ['Assignment'], m)
print(rec[0])
# 儲存清理後的資料
mf = 'data/cmovies.json'
dump_json(mf, m)
程式碼解密:
get_indx函式:取得含有特定鍵(null)的記錄索引。get_data函式:根據特定條件篩選資料。get_unique函式:取得特定欄位中的唯一值。- 清理電影資料:處理含有
null值的記錄,並將電影標題和年份分離。
連線MongoDB並進行分析
在完成資料清理後,我們可以將資料儲存到MongoDB中進行進一步的分析。以下程式碼展示瞭如何連線MongoDB:
class conn:
from pymongo import MongoClient
client = MongoClient('localhost', port=27017)
def __init__(self, dbname):
self.db = conn.client[dbname]
def getDB(self):
return self.db
程式碼解密:
conn類別:建立與MongoDB的連線,並提供取得資料函式庫的方法。
資料探索與MongoDB資料儲存實務
本章節主要探討如何使用Python處理JSON資料集,並將處理後的資料儲存至MongoDB資料函式庫中。文中提供了兩個程式碼範例,分別展示瞭如何進行資料探索、資料處理,以及如何將處理後的資料儲存至MongoDB。
資料處理與探索
首先,我們來看看第一個程式碼範例,該範例展示瞭如何讀取JSON資料集、進行資料處理,以及計算特定電影的平均評分、觀眾平均年齡等統計資訊。
程式碼解析
def get_info(*args):
a = [arg for arg in args]
ratings = [int(row[a[0][1]]) for row in a[2] if row[a[0][0]] == a[1]]
uids = [row[a[0][3]] for row in a[2] if row[a[0][0]] == a[1]]
title = [row[a[0][2]] for row in a[3] if row[a[0][0]] == a[1]]
age = [int(row[a[0][4]]) for col in uids for row in a[4] if col == row[a[0][3]]]
gender = [row[a[0][5]] for col in uids for row in a[4] if col == row[a[0][3]]]
return (ratings, title[0], uids, age, gender)
def generate(k, v, r, m, u):
for i, mid in enumerate(v):
dic = {}
rec = get_info(k, mid, r, m, u)
dic = {'_id': i, 'mid': mid, 'title': rec[1], 'avg_rating': np.mean(rec[0]),
'n_ratings': len(rec[0]), 'avg_age': np.mean(rec[3]),
'M': rec[4].count('M'), 'F': rec[4].count('F')}
dic['avg_rating'] = round(float(str(dic['avg_rating'])[:6]), 2)
dic['avg_age'] = round(float(str(dic['avg_age'])[:6]))
yield dic
def gen_ls(g):
for i, row in enumerate(g):
yield row
內容解密:
get_info函式:該函式接收多個引數,包括鍵值列表、電影ID、評分資料集、電影資料集和使用者資料集。它透過列表推導式提取特定電影的評分、使用者ID、電影標題、使用者年齡和性別等資訊。generate函式:該函式為每個電影ID生成一個包含電影統計資訊的字典,如平均評分、評分人數、平均年齡和男女比例等。它利用get_info函式取得原始資料,並進行計算和格式化。gen_ls函式:該函式是一個生成器,用於遍歷由generate函式產生的字典序列。
資料儲存至MongoDB
接下來的程式碼展示瞭如何將處理後的資料儲存至MongoDB資料函式庫。
程式碼解析
if __name__ == "__main__":
# 讀取JSON資料集
m = 'data/cmovies.json'
movies = np.array(read_json(m))
r = 'data/ratings.json'
ratings = np.array(read_json(r))
u = 'data/users.json'
users = np.array(read_json(u))
# 建立MongoDB連線
obj = conn.conn('test')
db = obj.getDB()
# 將生成器中的資料儲存至MongoDB
gen = generate(keys, new_mv, ratings, movies, users)
gls = gen_ls(gen)
movie_info = db.movie_info
movie_info.drop()
start = clock()
for row in gls:
movie_info.insert(row)
end = clock()
elapsed_ls = end - start
print(hf.format_timespan(elapsed_ls, detailed=True))
內容解密:
- 讀取資料集:程式首先讀取三個JSON檔案,分別包含電影資訊、評分資料和使用者資訊。
- 建立MongoDB連線:透過自定義的
conn類別建立與MongoDB的連線,並取得資料函式庫物件。 - 儲存資料:利用前面定義的生成器,將處理後的電影統計資訊逐一插入到MongoDB的
movie_info集合中。
直接儲存JSON資料至MongoDB
另一個程式碼範例直接將原始JSON資料集儲存至MongoDB。
程式碼解析
def create_db(c, d):
c = db[c]
c.drop()
for i, row in enumerate(d):
row['_id'] = i
c.insert(row)
if __name__ == "__main__":
u = read_json('data/users.json')
m = read_json('data/cmovies.json')
r = read_json('data/ratings.json')
obj = conn.conn('test')
db = obj.getDB()
start = clock()
create_db('users', u)
create_db('movies', m)
create_db('ratings', r)
end = clock()
elapsed_ls = end - start
print(hf.format_timespan(elapsed_ls, detailed=True))
內容解密:
create_db函式:該函式負責建立或更新MongoDB中的集合,並將提供的資料插入其中。- 主程式:讀取三個JSON檔案,將資料儲存至對應的MongoDB集合中,並記錄操作耗時。
綜上所述,本章節透過具體的程式碼範例,詳細介紹瞭如何使用Python處理JSON資料,並將其儲存至MongoDB的過程。這些範例對於理解大資料處理流程和實踐具有重要的參考價值。
使用 MongoDB 進行資料聚合與 Twitter 資料擷取
MongoDB 聚合管道簡介
MongoDB 的聚合管道是一種資料處理框架,模擬資料處理管道的概念。檔案進入多階段管道後,被轉換為聚合結果。除了按特定欄位或欄位分組和排序檔案以及聚合陣列內容外,管道階段還可以使用運算元執行諸如計算平均值或連線字串等任務。聚合管道使用原生 MongoDB 操作高效地進行資料聚合,是 MongoDB 中首選的資料聚合方法。
使用聚合管道比對檔案
以下程式碼範例展示瞭如何使用聚合管道比對檔案:
import sys, os
sys.path.append(os.getcwd() + '/classes')
import conn
def match_item(k, v, d):
pipeline = [{'$match': {k: v}}]
q = db.command('aggregate', d, pipeline=pipeline)
return q
if __name__ == "__main__":
obj = conn.conn('test')
db = obj.getDB()
movie = 'Toy Story'
q = match_item('title', movie, 'movie_info')
r = q['result'][0]
print(movie, 'document:')
print(r)
print('average rating', r['avg_rating'], '\n')
內容解密:
- 匯入必要的模組:程式碼首先匯入
sys、os和自定義的conn類別。 - 定義
match_item函式:該函式使用聚合管道根據給定的鍵值對比對檔案。 - 執行聚合操作:在
if __name__ == "__main__":區塊中,建立與 MongoDB 的連線,並使用match_item函式查詢特定電影的檔案。
多階段聚合管道範例
以下是一個多階段聚合管道的範例:
def stages(k, v, r, d):
pipeline = [
{'$match': {'$and': [{k: v}, {'rating': {'$eq': r}}]}},
{'$project': {'_id': 1, 'user_id': 1, 'movie_id': 1, 'rating': 1}},
{'$limit': 100}
]
q = db.command('aggregate', d, pipeline=pipeline)
return q
if __name__ == "__main__":
obj = conn.conn('test')
db = obj.getDB()
u = '3'
r = '5'
q = stages('user_id', u, r, 'ratings')
result = q['result']
print('ratings of', r, 'for user ' + str(u) + ':')
for i, row in enumerate(result):
print(row)
內容解密:
- 定義
stages函式:該函式使用一個三階段的聚合管道。- 第一階段:根據使用者 ID 和評分比對檔案。
- 第二階段:投影出需要的欄位。
- 第三階段:限制傳回的檔案數量。
- 執行多階段聚合操作:在主程式區塊中,使用
stages函式查詢特定使用者的評分,並列印結果。
從 Twitter 擷取資料
要從 Twitter 擷取資料,需要連線到 Twitter Streaming API。首先,需要註冊並取得 API 金鑰、API 金鑰、存取令牌和存取令牌金鑰。
儲存 Twitter 認證資訊到 JSON 檔案
import json
if __name__ == '__main__':
consumer_key = ''
consumer_secret = ''
access_token = ''
access_encrypted = ''
data = {}
data['ck'] = consumer_key
data['cs'] = consumer_secret
data['at'] = access_token
data['ae'] = access_encrypted
json_data = json.dumps(data)
with open('data/credentials.json', 'w') as obj:
obj.write('[\n' + json_data + '\n]')
內容解密:
- 匯入
json模組:用於處理 JSON 資料。 - 儲存認證資訊:將 Twitter 認證資訊儲存到 JSON 檔案中,以隱藏敏感資訊。
使用 TwitterSearch API 串流 Twitter 資料
from TwitterSearch import *
import json
class TwitSearch:
def __init__(self, cred, ls, limit):
self.cred = cred
self.ls = ls
self.limit = limit
def search(self):
num = 0
dt = []
try:
tso = TwitterSearchOrder()
tso.set_keywords(self.ls)
tso.set_language('en')
ts = TwitterSearch(
consumer_key=self.cred[0]['ck'],
consumer_secret=self.cred[0]['cs'],
access_token=self.cred[0]['at'],
access_token_secret=self.cred[0]['ae']
)
for tweet in ts.search_tweets_iterable(tso):
if num <= self.limit:
dic = {}
dic['_id'] = num
dic['tweeter'] = tweet['user']['screen_name']
dic['tweet_text'] = tweet['text']
dt.append(dic)
else:
break
num += 1
except TwitterSearchException as e:
print(e)
return dt
if __name__ == '__main__':
cred = get_creds()
ls = ['machine', 'learning']
limit = 10
obj = TwitSearch(cred, ls, limit)
data = obj.search()
內容解密:
- 定義
TwitSearch類別:用於使用 TwitterSearch API 搜尋推文。 search方法:根據設定的關鍵字和語言搜尋推文,並將結果儲存到列表中。- 錯誤處理:捕捉
TwitterSearchException異常並列印錯誤訊息。
這些範例展示瞭如何使用 MongoDB 的聚合管道進行資料處理,以及如何使用 TwitterSearch API 從 Twitter 擷取資料。透過這些技術,可以高效地處理和分析大量資料。