在資料科學專案中,有效管理和分享資料至關重要。本文將介紹如何利用 Redis 和 MongoDB 這兩種資料儲存服務來提升資料處理效率和跨 Notebook 協作。首先,我們會逐步說明如何在 Jupyter 環境中安裝 Redis,並示範如何使用 Redis 進行資料分享、追蹤迭代過程,以及傳遞複雜資料結構,如字典和 NumPy 陣列。接著,會引導讀者在 AWS 上設定 MongoDB 服務,其中包含建立 AWS 例項、安裝 Docker、組態 MongoDB,以及使用 pymongo 進行 Python 操作。最後,將結合實務案例,示範如何將 Twitter 串流資料擷取並儲存至 MongoDB,提供讀者一個完整的資料儲存解決方案。

第8章:資料儲存服務

本章將探討在資料科學工作流程中使用Redis和MongoDB等資料儲存服務的實務方法。這些服務不僅能夠提升資料處理的效率,還能實作跨Notebook的資料分享。

在Jupyter Notebook中安裝Redis

首先,我們需要在jupyter/scipy-notebook容器中安裝Python的Redis函式庫。清單8-17展示瞭如何在Jupyter Notebook中使用pip安裝Redis函式庫的過程。

清單8-17:透過Shell命令在Jupyter Notebook中安裝Redis

In [1]: import redis
-----------------------------------------------------------------
ImportError Traceback (most recent call last)
<ipython-input-1-6872e27f77ac> in <module>()
----> 1 import redis
ImportError: No module named 'redis'

In [2]: !pip install redis
Collecting redis
Downloading redis-2.10.5-py2.py3-none-any.whl (60kB)
100% |████████████████████████████████| 61kB 2.5MB/s ta 0:00:01
Installing collected packages: redis
Successfully installed redis-2.10.5

安裝過程詳解:

  1. 首先嘗試匯入redis模組,但由於尚未安裝而引發ImportError
  2. 使用!pip install redis命令在Notebook中執行pip安裝Redis函式庫。
  3. 安裝成功後,確認Redis函式庫已可正常使用。

簡單的Redis範例

安裝完成後,我們可以開始使用Redis服務。Redis允許我們將資料儲存在記憶體中,並在多個程式之間分享這些資料。圖8-4展示瞭如何從兩個不同的Notebook連線到Redis服務。

清單8-18:連線到Redis服務

In [1]: from redis import Redis
from os import environ
REDIS = Redis(host=environ['THIS_REDIS_PORT_6379_TCP_ADDR'])

連線過程詳解:

  1. redis模組匯入Redis類別。
  2. 使用環境變數中的主機地址建立到Redis服務的連線。

跨Notebook分享資料

我們可以透過Redis在不同的Notebook之間分享資料。清單8-19和8-20展示瞭如何在一個Notebook中設定鍵值對,並在另一個Notebook中檢索該值。

清單8-19:在Redis中設定鍵值對

In [2]: REDIS.set('foo', 42)

清單8-20:從Redis取得值

In [2]: REDIS.get('foo')
Out[2]: b'42'

資料分享詳解:

  1. 在一個Notebook中使用REDIS.set()方法設定鍵值對。
  2. 在另一個Notebook中使用REDIS.get()方法檢索該值。
  3. 注意取得的值是位元組字串(bytestring),需要時可進行解碼。

追蹤跨Notebook的迭代過程

Redis還可以用於追蹤迭代過程的進度。清單8-21模擬了一個迭代過程,並在每次迭代時更新Redis中的計數器。清單8-22則展示瞭如何在另一個Notebook中檢索該計數器的當前值。

清單8-21:模擬迭代過程並更新Redis計數器

In [3]: import time
def some_iterative_process():
    time.sleep(1)

In [4]: count = 0
REDIS.set('count', 0)
while count < 30:
    some_iterative_process()
    count = REDIS.incr('count')

清單8-22:從Redis取得計數器值

In [2]: REDIS.get('count')
Out[2]: b'8'

追蹤過程詳解:

  1. 在一個Notebook中執行迭代過程,並在每次迭代時更新Redis中的計數器。
  2. 在另一個Notebook中隨時檢索計數器的當前值,以追蹤進度。

透過JSON傳遞字典

我們可以使用JSON將字典物件儲存在Redis中,並在另一個Notebook中檢索和還原該字典。清單8-23和8-24展示了這個過程。

清單8-23:定義字典並儲存到Redis

In [5]: import numpy as np
import json
model_params = {
    'C': list(np.logspace(-3,3,7)),
    'penalty': 'l1',
    'solver' : 'newton-cg'
}
REDIS.set('model_params', json.dumps(model_params))
Out[5]: True

清單8-24:從Redis載入字典

In [4]: REDIS.get('model_params')
Out[4]: b'{"C": [0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0], "solver": "newton-cg", "penalty": "l1"}'

In [5]: import json
json.loads(REDIS.get('model_params').decode())
Out[5]: {'C': [0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0], 'penalty': 'l1', 'solver': 'newton-cg'}

字典傳遞詳解:

  1. 使用json.dumps()將字典轉換為JSON字串並儲存在Redis中。
  2. 從Redis檢索JSON字串後,使用.decode()將其轉換為普通字串。
  3. 使用json.loads()將JSON字串還原為Python字典。

將NumPy陣列作為位元組字串傳遞

NumPy陣列可以方便地轉換為位元組字串並儲存在Redis中。清單8-25和8-26展示瞭如何編碼和儲存NumPy陣列,以及如何載入和解碼。

清單8-25:將NumPy陣列編碼並儲存到Redis

In [6]: import numpy as np
A = np.array([
    [1,1,1],
    [2,2,2],
    [3,3,3]
])
n,m = A.shape
encoded_A = A.ravel().tostring()
REDIS.set('encoded_A', encoded_A)
REDIS.set('A_n', n)
REDIS.set('A_m', m)
Out[6]: True

清單8-26:從Redis載入並解碼NumPy陣列

In [6]: import numpy as np
A_bytestring = REDIS.get('encoded_A')
A_encoded = np.fromstring(A_bytestring, dtype=int)
n = int(REDIS.get('A_n').decode())
m = int(REDIS.get('A_m').decode())
A = A_encoded.reshape(n, m)
A
Out[6]: array([[1, 1, 1],
               [2, 2, 2],
               [3, 3, 3]])

NumPy陣列傳遞詳解:

  1. 使用.ravel()將二維陣列展平為一維向量。
  2. 使用.tostring()將向量轉換為位元組字串並儲存在Redis中。
  3. 同時儲存陣列的維度資訊(n和m)。
  4. 從Redis載入位元組字串後,使用np.fromstring()還原為NumPy陣列。
  5. 使用.reshape()根據儲存的維度資訊還原陣列形狀。

MongoDB簡介

與Redis不同,MongoDB是一個完整的資料函式庫系統,支援更複雜的資料儲存需求。它使用類別似JSON的檔案格式來儲存資料,支援靈活的無結構化資料儲存。

MongoDB的主要特點:

  • 使用類別似JSON的檔案格式儲存資料(BSON)
  • 無需預先定義結構描述(schema-less)
  • 支援豐富的查詢功能

本章介紹瞭如何使用Redis和MongoDB這兩種不同的資料儲存服務來支援資料科學工作流程。透過這些工具,我們可以實作跨Notebook的資料分享和協同工作,大大提升了工作效率。未來,我們可以進一步探索這些技術在更大規模的資料處理和分析任務中的應用潛力。

在AWS上設定MongoDB資料儲存服務

隨著雲端運算的普及,資料儲存服務的設定與管理變得越來越重要。本章節將詳細介紹如何在AWS上設定MongoDB,一種流行的NoSQL資料函式庫。首先,我們將建立一個新的AWS t2.micro例項,然後在該例項上安裝Docker並組態MongoDB。

建立新的AWS t2.micro例項

要開始,我們需要在AWS上建立一個新的t2.micro例項。以下是具體步驟:

  1. 登入AWS EC2儀錶板,選擇“Launch Instance”。
  2. 在“Choose AMI”標籤頁中,選擇Ubuntu Server 16.04。
  3. 在“Choose Instance Type”標籤頁中,選擇t2.micro。
  4. 在“Add Storage”標籤頁中,使用預設的8GB儲存設定。
  5. 在“Configure Security Group”標籤頁中,選擇“Create a new security group”。
    • 確認允許從任何地方透過SSH(埠22)存取例項。
    • 新增一條規則,允許從任何地方透過埠2376存取,以便從Docker Hub提取映像。
    • 新增另一條規則,允許從任何地方透過埠27017存取,這是MongoDB的預設埠。

組態新的AWS t2.micro例項以使用Docker

一旦新的例項啟動並執行,我們就需要組態它以使用Docker。以下是步驟:

  1. 記錄下新組態的AWS例項的IP地址。
  2. 使用該IP地址透過SSH登入例項。
  3. 透過shell指令碼安裝Docker。
  4. 將ubuntu使用者新增到docker群組。
  5. 登出並重新登入SSH會話。
# 登入AWS例項
$ ssh ubuntu@255.255.255.255

# 安裝Docker
$ curl -sSL https://get.docker.com/ | sh

# 將ubuntu使用者新增到docker群組
$ sudo usermod -aG docker ubuntu

提取MongoDB映像

接下來,我們需要從Docker Hub提取MongoDB映像。

# 提取MongoDB映像
$ docker pull mongo

建立和檢視新的資料卷

為了使MongoDB的資料持久化,我們需要建立一個資料卷。

# 建立名為mongo-dbstore的資料卷
$ docker volume create --name mongo-dbstore

# 檢視已建立的資料卷
$ docker volume ls
DRIVER              VOLUME NAME
local               mongo-dbstore
local               redis-dbstore

以持久化服務方式啟動MongoDB

現在,我們可以啟動MongoDB容器並將其連線到我們剛才建立的資料卷。

# 啟動MongoDB容器並對映資料卷
$ docker run -d --name this_mongo -v mongo-dbstore:/data/db -p 27017:27017 mongo
38a2f19d72a09851dc32cb874817a45274e888dd93aca01b5500cbfe9fb9364c

驗證MongoDB安裝

我們可以透過連線到正在執行的MongoDB容器來驗證安裝是否成功。

# 連線到MongoDB容器並執行mongo命令列客戶端
$ docker exec -it this_mongo mongo

# 插入一個檔案到test集合中
> db.test.insert({"foo":1})
WriteResult({ "nInserted" : 1 })

# 查詢剛才插入的檔案
> db.test.find()
{ "_id" : ObjectId("591a00ee33e4717a80d8c92d"), "foo" : 1 }

內容解密:

  • docker exec -it this_mongo mongo:此命令用於連線到正在執行的MongoDB容器,並啟動mongo命令列客戶端。
  • db.test.insert({"foo":1}):此命令在名為test的集合中插入一個JSON檔案{"foo":1}
  • db.test.find():此命令查詢test集合中的所有檔案。

在Jupyter中使用MongoDB

要在Jupyter筆記本中使用MongoDB,我們需要安裝Python函式庫pymongo

# 在Jupyter筆記本中安裝pymongo
!pip install pymongo

MongoDB結構簡介

作為一種NoSQL資料函式庫,MongoDB採用了極簡的結構設計。它包含三種主要的實體:

  1. 資料函式庫(Database)
  2. 集合(Collection)
  3. 檔案(Document)

這些實體之間的關係是:資料函式庫包含集合,集合包含檔案。每個檔案都是JSON資料記錄的二進製表示,由鍵值對組成。

使用pymongo連線MongoDB

pymongo是官方推薦的用於Python操作MongoDB的工具。首先,我們需要使用pymongo.MongoClient建立與MongoDB的連線。

from pymongo import MongoClient

# 連線到MongoDB
client = MongoClient('255.255.255.255', 27017)

內容解密:

  • MongoClient('255.255.255.255', 27017):此函式用於建立與指定IP地址和埠號的MongoDB服務的連線。
  • 如果使用預設埠(27017),則無需明確指定埠號。

操作MongoDB資料函式庫和集合

pymongo提供了一種“取得或建立”的機制,用於操作資料函式庫和集合。如果指定的資料函式庫或集合存在,則傳回對其的參照;如果不存在,則建立它並傳回參照。

# 取得或建立一個名為my_database的資料函式庫
db_ref = client.my_database

# 列出所有資料函式庫名稱
client.database_names()

內容解密:

  • client.my_database:此操作取得或建立一個名為my_database的資料函式庫。
  • client.database_names():此方法傳回目前存在的所有資料函式庫名稱列表。

本章節詳細介紹瞭如何在AWS上設定和使用MongoDB,包括建立AWS例項、安裝Docker、組態MongoDB以及在Jupyter中使用MongoDB。透過這些步驟,您可以有效地佈署和管理自己的MongoDB服務。

MongoDB 與 Twitter 串流資料儲存實作

MongoDB 資料函式庫操作基礎

在進行 Twitter 串流資料儲存之前,首先需要了解 MongoDB 的基本操作。以下將詳細介紹如何使用 Python 操作 MongoDB 資料函式庫。

建立資料函式庫與集合參考

首先,需要建立對資料函式庫和集合的參考。以下程式碼展示瞭如何建立參考並顯示現有的資料函式庫和集合。

# 建立集合參考並顯示資料函式庫和集合
coll_ref = db_ref.my_collection
print(client.database_names(), db_ref.collection_names())

插入檔案到集合中

接下來,示範如何建立一個 Python 字典並將其插入到集合中。

# 插入檔案到集合中
sample_doc = {
    "name": "Joshua",
    "message": "Hi!",
    'my_array': [1, 2, 3, 4, 5, 6, 7, 9]
}
result = coll_ref.insert_one(sample_doc)
print(result)

顯示資料函式庫和集合

插入檔案後,再次顯示資料函式庫和集合,以確認是否成功建立。

# 顯示資料函式庫和集合
print(client.database_names(), db_ref.collection_names())

清除集合中的檔案

最後,示範如何清除集合中的所有檔案,並再次確認資料函式庫和集合的狀態。

# 清除集合中的檔案並顯示資料函式庫和集合
coll_ref.drop()
print(client.database_names(), db_ref.collection_names())

程式碼解析

  1. 建立參考:首先建立對特定集合的參考。
  2. 插入檔案:建立一個 Python 字典,並使用 .insert_one() 方法將其插入到集合中。
  3. 顯示狀態:使用 database_names()collection_names() 方法顯示目前的資料函式庫和集合。
  4. 清除檔案:使用 .drop() 方法清除集合中的所有檔案。

將 Twitter 串流資料儲存至 MongoDB

本文將介紹如何使用 Python 將 Twitter 串流資料儲存至 MongoDB。首先,需要取得 Twitter API 的憑證。

取得 Twitter API 憑證

  1. 前往 https://apps.twitter.com 建立新的 Twitter 應用程式。
  2. 填寫應用程式名稱、描述和網站網址。
  3. 同意開發者協定並建立應用程式。
  4. 在「Keys and Access Tokens」標籤頁中取得 Consumer Key、Consumer Secret、Access Token 和 Access Token Secret。

載入 Twitter API 憑證

將取得的憑證載入到 Jupyter Notebook 中。

# 載入 Twitter API 憑證
CONSUMER_KEY = "你的 Consumer Key"
CONSUMER_SECRET = "你的 Consumer Secret"
ACCESS_TOKEN = "你的 Access Token"
ACCESS_SECRET = "你的 Access Token Secret"

安裝 Twitter 程式函式庫

使用 pip 安裝 Python 的 Twitter 程式函式庫。

# 安裝 Twitter 程式函式庫
!pip install twitter

初始化 Twitter OAuth 物件

使用載入的憑證初始化 twitter.OAuth 物件。

# 初始化 Twitter OAuth 物件
from twitter import OAuth
oauth = OAuth(ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET)

蒐集特定地理位置的推文

使用 Twitter 的 Public Stream 蒐集特定地理位置的推文。首先,需要定義一個地理圍欄(bounding box)。

# 定義地理圍欄(以洛杉磯為例)
los_angeles_bbox = "-118.551346,33.96666,-118.443428,34.05056"

然後,初始化 twitter.TwitterStream 物件並建立一個迭代器來蒐集推文。

# 初始化 TwitterStream 物件
from twitter import TwitterStream
twitter_stream = TwitterStream(auth=oauth)
twitterator = twitter_stream.statuses.filter(locations=los_angeles_bbox)

蒐集推文並儲存至 MongoDB

使用迭代器蒐集推文,並將其儲存至 MongoDB。

# 蒐集推文並儲存至 MongoDB
for tweet in twitterator:
    # 將推文儲存至 MongoDB
    coll_ref.insert_one(tweet)
    print(tweet)