隨著資料量的爆炸性增長,有效地處理和分析大資料變得至關重要。本文以廣泛使用的 MovieLens 資料集為例,探討如何運用 Python 和 MongoDB 處理和分析大資料。首先,我們會使用 Python 對原始資料進行前處理,包含讀取、清理和格式轉換,將其轉換為易於處理的 JSON 格式。接著,示範如何將處理後的資料匯入 MongoDB 資料函式庫,並利用 MongoDB 的聚合管道功能進行高效的資料查詢和分析,例如計算電影平均評分、觀眾平均年齡等統計資料。此外,文章也提供直接將原始 JSON 資料儲存至 MongoDB 的方法,以及如何使用 Twitter API 擷取資料的程式碼範例,展現更全面的資料處理流程。

大資料處理與分析實務:以MovieLens資料集為例

大資料是21世紀的熱門話題。本文將以GroupLens提供的MovieLens資料集為例,展示如何處理和分析大資料。MovieLens資料集包含了約一百萬條評分記錄,涵蓋了六千名使用者對四千部電影的評價。

資料前處理

首先,我們需要對資料進行前處理,包括讀取、清理和格式化。以下程式碼展示瞭如何讀取MovieLens資料集並將其轉換為JSON格式:

import json, csv

def read_dat(h, f):
    return csv.DictReader((line.replace('::', ':') for line in open(f)), delimiter=':', fieldnames=h, quoting=csv.QUOTE_NONE)

def gen_dict(d):
    for row in d:
        yield dict(row)

def dump_json(f, l, d):
    f = open(f, 'w')
    f.write('[')
    for i, row in enumerate(d):
        j = json.dumps(row)
        f.write(j)
        if i < l - 1:
            f.write(',')
    else:
        f.write(']')
    f.close()

def read_json(f):
    with open(f) as f:
        return json.load(f)

def display(n, f):
    for i, row in enumerate(f):
        if i < n:
            print(row)
    print()

if __name__ == "__main__":
    u_dat = 'data/ml-1m/users.dat'
    m_dat = 'data/ml-1m/movies.dat'
    r_dat = 'data/ml-1m/ratings.dat'
    unames = ['user_id', 'gender', 'age', 'occupation', 'zip']
    mnames = ['movie_id', 'title', 'genres']
    rnames = ['user_id', 'movie_id', 'rating', 'timestamp']

    # 讀取資料集並計算大小
    users = read_dat(unames, u_dat)
    ul = len(list(gen_dict(users)))
    movies = read_dat(mnames, m_dat)
    ml = len(list(gen_dict(movies)))
    ratings = read_dat(rnames, r_dat)
    rl = len(list(gen_dict(ratings)))

    print('資料集大小:')
    print('使用者', ul)
    print('電影', ml)
    print('評分', rl)

    # 將資料集轉換為JSON格式
    users = read_dat(unames, u_dat)
    users = gen_dict(users)
    movies = read_dat(mnames, m_dat)
    movies = gen_dict(movies)
    ratings = read_dat(rnames, r_dat)
    ratings = gen_dict(ratings)

    uf = 'data/users.json'
    dump_json(uf, ul, users)
    mf = 'data/movies.json'
    dump_json(mf, ml, movies)
    rf = 'data/ratings.json'
    dump_json(rf, rl, ratings)

    # 驗證資料
    u = read_json(uf)
    m = read_json(mf)
    r = read_json(rf)
    n = 1
    display(n, u)
    display(n, m)
    display(n, r)

程式碼解密:

  1. read_dat函式:讀取MovieLens資料集中的.dat檔案,將雙冒號(::)替換為單冒號(:),並使用csv.DictReader讀取資料。
  2. gen_dict函式:將OrderedDict列表轉換為普通的字典列表,以便於後續處理。
  3. dump_json函式:將資料寫入JSON檔案中。
  4. read_json函式:讀取JSON檔案中的資料。
  5. display函式:顯示前n條資料記錄,用於驗證資料是否正確。

資料清理與轉換

在取得JSON格式的資料後,我們需要對資料進行進一步的清理和轉換。以下程式碼展示瞭如何清理電影資料集:

import json, numpy as np

def read_json(f):
    with open(f) as f:
        return json.load(f)

def dump_json(f, d):
    with open(f, 'w') as fout:
        json.dump(d, fout)

def display(n, d):
    [print(row) for i, row in enumerate(d) if i < n]

def get_indx(k, d):
    return [row[k] for row in d if 'null' in row]

def get_data(k, l, d):
    return [row for i, row in enumerate(d) if row[k] in l]

def get_unique(key, d):
    s = set()
    for row in d:
        for k, v in row.items():
            if k in key:
                s.add(v)
    return np.sort(list(s))

if __name__ == "__main__":
    mf = 'data/movies.json'
    m = read_json(mf)
    
    # 清理電影資料
    indx = get_indx('movie_id', m)
    for row in m:
        if row['movie_id'] in indx:
            # 處理含有null值的記錄
            row['title'] = row['title'] + ':' + row['genres']
            row['genres'] = row['null'][0]
            del row['null']
        title = row['title'].split(" ")
        year = title.pop()
        year = ''.join(c for c in year if c not in '()')
        row['title'] = ' '.join(title)
        row['year'] = year

    # 顯示清理後的資料
    n = 2
    display(n, m)

    # 取得年份的唯一值
    s = get_unique('year', m)
    print('\n', s, '\n')

    # 清理特定記錄
    rec = get_data('year', ['Assignment'], m)
    print(rec[0])

    # 儲存清理後的資料
    mf = 'data/cmovies.json'
    dump_json(mf, m)

程式碼解密:

  1. get_indx函式:取得含有特定鍵(null)的記錄索引。
  2. get_data函式:根據特定條件篩選資料。
  3. get_unique函式:取得特定欄位中的唯一值。
  4. 清理電影資料:處理含有null值的記錄,並將電影標題和年份分離。

連線MongoDB並進行分析

在完成資料清理後,我們可以將資料儲存到MongoDB中進行進一步的分析。以下程式碼展示瞭如何連線MongoDB:

class conn:
    from pymongo import MongoClient
    client = MongoClient('localhost', port=27017)

    def __init__(self, dbname):
        self.db = conn.client[dbname]

    def getDB(self):
        return self.db

程式碼解密:

  1. conn類別:建立與MongoDB的連線,並提供取得資料函式庫的方法。

資料探索與MongoDB資料儲存實務

本章節主要探討如何使用Python處理JSON資料集,並將處理後的資料儲存至MongoDB資料函式庫中。文中提供了兩個程式碼範例,分別展示瞭如何進行資料探索、資料處理,以及如何將處理後的資料儲存至MongoDB。

資料處理與探索

首先,我們來看看第一個程式碼範例,該範例展示瞭如何讀取JSON資料集、進行資料處理,以及計算特定電影的平均評分、觀眾平均年齡等統計資訊。

程式碼解析

def get_info(*args):
    a = [arg for arg in args]
    ratings = [int(row[a[0][1]]) for row in a[2] if row[a[0][0]] == a[1]]
    uids = [row[a[0][3]] for row in a[2] if row[a[0][0]] == a[1]]
    title = [row[a[0][2]] for row in a[3] if row[a[0][0]] == a[1]]
    age = [int(row[a[0][4]]) for col in uids for row in a[4] if col == row[a[0][3]]]
    gender = [row[a[0][5]] for col in uids for row in a[4] if col == row[a[0][3]]]
    return (ratings, title[0], uids, age, gender)

def generate(k, v, r, m, u):
    for i, mid in enumerate(v):
        dic = {}
        rec = get_info(k, mid, r, m, u)
        dic = {'_id': i, 'mid': mid, 'title': rec[1], 'avg_rating': np.mean(rec[0]),
               'n_ratings': len(rec[0]), 'avg_age': np.mean(rec[3]),
               'M': rec[4].count('M'), 'F': rec[4].count('F')}
        dic['avg_rating'] = round(float(str(dic['avg_rating'])[:6]), 2)
        dic['avg_age'] = round(float(str(dic['avg_age'])[:6]))
        yield dic

def gen_ls(g):
    for i, row in enumerate(g):
        yield row

內容解密:

  1. get_info函式:該函式接收多個引數,包括鍵值列表、電影ID、評分資料集、電影資料集和使用者資料集。它透過列表推導式提取特定電影的評分、使用者ID、電影標題、使用者年齡和性別等資訊。
  2. generate函式:該函式為每個電影ID生成一個包含電影統計資訊的字典,如平均評分、評分人數、平均年齡和男女比例等。它利用get_info函式取得原始資料,並進行計算和格式化。
  3. gen_ls函式:該函式是一個生成器,用於遍歷由generate函式產生的字典序列。

資料儲存至MongoDB

接下來的程式碼展示瞭如何將處理後的資料儲存至MongoDB資料函式庫。

程式碼解析

if __name__ == "__main__":
    # 讀取JSON資料集
    m = 'data/cmovies.json'
    movies = np.array(read_json(m))
    r = 'data/ratings.json'
    ratings = np.array(read_json(r))
    u = 'data/users.json'
    users = np.array(read_json(u))

    # 建立MongoDB連線
    obj = conn.conn('test')
    db = obj.getDB()

    # 將生成器中的資料儲存至MongoDB
    gen = generate(keys, new_mv, ratings, movies, users)
    gls = gen_ls(gen)
    movie_info = db.movie_info
    movie_info.drop()
    start = clock()
    for row in gls:
        movie_info.insert(row)
    end = clock()
    elapsed_ls = end - start
    print(hf.format_timespan(elapsed_ls, detailed=True))

內容解密:

  1. 讀取資料集:程式首先讀取三個JSON檔案,分別包含電影資訊、評分資料和使用者資訊。
  2. 建立MongoDB連線:透過自定義的conn類別建立與MongoDB的連線,並取得資料函式庫物件。
  3. 儲存資料:利用前面定義的生成器,將處理後的電影統計資訊逐一插入到MongoDB的movie_info集合中。

直接儲存JSON資料至MongoDB

另一個程式碼範例直接將原始JSON資料集儲存至MongoDB。

程式碼解析

def create_db(c, d):
    c = db[c]
    c.drop()
    for i, row in enumerate(d):
        row['_id'] = i
        c.insert(row)

if __name__ == "__main__":
    u = read_json('data/users.json')
    m = read_json('data/cmovies.json')
    r = read_json('data/ratings.json')
    
    obj = conn.conn('test')
    db = obj.getDB()
    
    start = clock()
    create_db('users', u)
    create_db('movies', m)
    create_db('ratings', r)
    end = clock()
    elapsed_ls = end - start
    print(hf.format_timespan(elapsed_ls, detailed=True))

內容解密:

  1. create_db函式:該函式負責建立或更新MongoDB中的集合,並將提供的資料插入其中。
  2. 主程式:讀取三個JSON檔案,將資料儲存至對應的MongoDB集合中,並記錄操作耗時。

綜上所述,本章節透過具體的程式碼範例,詳細介紹瞭如何使用Python處理JSON資料,並將其儲存至MongoDB的過程。這些範例對於理解大資料處理流程和實踐具有重要的參考價值。

使用 MongoDB 進行資料聚合與 Twitter 資料擷取

MongoDB 聚合管道簡介

MongoDB 的聚合管道是一種資料處理框架,模擬資料處理管道的概念。檔案進入多階段管道後,被轉換為聚合結果。除了按特定欄位或欄位分組和排序檔案以及聚合陣列內容外,管道階段還可以使用運算元執行諸如計算平均值或連線字串等任務。聚合管道使用原生 MongoDB 操作高效地進行資料聚合,是 MongoDB 中首選的資料聚合方法。

使用聚合管道比對檔案

以下程式碼範例展示瞭如何使用聚合管道比對檔案:

import sys, os
sys.path.append(os.getcwd() + '/classes')
import conn

def match_item(k, v, d):
    pipeline = [{'$match': {k: v}}]
    q = db.command('aggregate', d, pipeline=pipeline)
    return q

if __name__ == "__main__":
    obj = conn.conn('test')
    db = obj.getDB()
    movie = 'Toy Story'
    q = match_item('title', movie, 'movie_info')
    r = q['result'][0]
    print(movie, 'document:')
    print(r)
    print('average rating', r['avg_rating'], '\n')

內容解密:

  1. 匯入必要的模組:程式碼首先匯入 sysos 和自定義的 conn 類別。
  2. 定義 match_item 函式:該函式使用聚合管道根據給定的鍵值對比對檔案。
  3. 執行聚合操作:在 if __name__ == "__main__": 區塊中,建立與 MongoDB 的連線,並使用 match_item 函式查詢特定電影的檔案。

多階段聚合管道範例

以下是一個多階段聚合管道的範例:

def stages(k, v, r, d):
    pipeline = [
        {'$match': {'$and': [{k: v}, {'rating': {'$eq': r}}]}},
        {'$project': {'_id': 1, 'user_id': 1, 'movie_id': 1, 'rating': 1}},
        {'$limit': 100}
    ]
    q = db.command('aggregate', d, pipeline=pipeline)
    return q

if __name__ == "__main__":
    obj = conn.conn('test')
    db = obj.getDB()
    u = '3'
    r = '5'
    q = stages('user_id', u, r, 'ratings')
    result = q['result']
    print('ratings of', r, 'for user ' + str(u) + ':')
    for i, row in enumerate(result):
        print(row)

內容解密:

  1. 定義 stages 函式:該函式使用一個三階段的聚合管道。
    • 第一階段:根據使用者 ID 和評分比對檔案。
    • 第二階段:投影出需要的欄位。
    • 第三階段:限制傳回的檔案數量。
  2. 執行多階段聚合操作:在主程式區塊中,使用 stages 函式查詢特定使用者的評分,並列印結果。

從 Twitter 擷取資料

要從 Twitter 擷取資料,需要連線到 Twitter Streaming API。首先,需要註冊並取得 API 金鑰、API 金鑰、存取令牌和存取令牌金鑰。

儲存 Twitter 認證資訊到 JSON 檔案

import json

if __name__ == '__main__':
    consumer_key = ''
    consumer_secret = ''
    access_token = ''
    access_encrypted = ''
    data = {}
    data['ck'] = consumer_key
    data['cs'] = consumer_secret
    data['at'] = access_token
    data['ae'] = access_encrypted
    json_data = json.dumps(data)
    with open('data/credentials.json', 'w') as obj:
        obj.write('[\n' + json_data + '\n]')

內容解密:

  1. 匯入 json 模組:用於處理 JSON 資料。
  2. 儲存認證資訊:將 Twitter 認證資訊儲存到 JSON 檔案中,以隱藏敏感資訊。

使用 TwitterSearch API 串流 Twitter 資料

from TwitterSearch import *
import json

class TwitSearch:
    def __init__(self, cred, ls, limit):
        self.cred = cred
        self.ls = ls
        self.limit = limit

    def search(self):
        num = 0
        dt = []
        try:
            tso = TwitterSearchOrder()
            tso.set_keywords(self.ls)
            tso.set_language('en')
            ts = TwitterSearch(
                consumer_key=self.cred[0]['ck'],
                consumer_secret=self.cred[0]['cs'],
                access_token=self.cred[0]['at'],
                access_token_secret=self.cred[0]['ae']
            )
            for tweet in ts.search_tweets_iterable(tso):
                if num <= self.limit:
                    dic = {}
                    dic['_id'] = num
                    dic['tweeter'] = tweet['user']['screen_name']
                    dic['tweet_text'] = tweet['text']
                    dt.append(dic)
                else:
                    break
                num += 1
        except TwitterSearchException as e:
            print(e)
        return dt

if __name__ == '__main__':
    cred = get_creds()
    ls = ['machine', 'learning']
    limit = 10
    obj = TwitSearch(cred, ls, limit)
    data = obj.search()

內容解密:

  1. 定義 TwitSearch 類別:用於使用 TwitterSearch API 搜尋推文。
  2. search 方法:根據設定的關鍵字和語言搜尋推文,並將結果儲存到列表中。
  3. 錯誤處理:捕捉 TwitterSearchException 異常並列印錯誤訊息。

這些範例展示瞭如何使用 MongoDB 的聚合管道進行資料處理,以及如何使用 TwitterSearch API 從 Twitter 擷取資料。透過這些技術,可以高效地處理和分析大量資料。