隨著越來越多的CT掃描儲存函式庫上線並尋求AI研究人員協助對抗疫情,我開發了一種完全根據開放原始碼元件的CT掃描降噪方法。這套方案整合了Python、Apache Spark、Apache Mahout(專注於分散式線性代數的Spark函式庫)以及Kubeflow等工具,為醫學影像處理提供了高效的解決方案。

在本文中,我不會探討背後的數學原理,而是專注於如何使用Kubeflow實作這一技術。這為研究人員提供了一個可擴充套件的管線,可以自由增加後續處理步驟並且其他研究者分享。

北美放射學會(RSNA)計劃很快發布一個COVID-19 CT掃描儲存函式庫,這使得開發通用的處理工具變得更加迫切。

使用Python進行資料準備

DICOM檔案處理基礎

CT掃描影像通常以DICOM格式儲存。在這種格式中,每個"切片"都儲存在單獨的檔案中,同時包含一些中繼資料,如畫素間距和切片間距。我們的第一步是讀取所有這些檔案,並建立一個畫素值的3D張量。然後將該張量"扁平化"為二維矩陣,以便後續執行奇異值分解。

您可以從多個來源取得DICOM檔案集:

  • coronacases.org(雖然下載DICOMs可能有些棘手)
  • 公共肺部影像資料函式庫
  • 如果您曾經做過CT掃描,醫生可能給您的光碟
  • 其他線上資源

重要的是,我們需要一個包含單個CT掃描的DICOM檔案目錄。在本例中,我假設在/data/dicom目錄中存在一組DICOM檔案。

轉換DICOM為張量

將DICOM影像轉換為張量出乎意料地簡單,前提是您擁有正確的依賴項。我使用pydicom,這是一個廣受支援的Python介面,用於處理DICOM影像。不過,pydicom的Docker映像不包含Grassroots DICOM (GDCM),而這對於將DICOM轉換為畫素陣列是必需的。

我解決這個問題的方法是使用pydicom Docker容器作為基礎映像,然後構建相容的GDCM版本。最終的映像我命名為rawkintrevo/covid-prep-dicom。有了pydicom和GDCM,將DICOM影像轉換為張量變得相當容易。以下是使用輕量級Python函式執行此操作的方法:

def dicom_to_matrix(input_dir: str, output_file: str) -> output_type:
    import pydicom
    import numpy as np
    from os import listdir
    
    def dicom_to_tensor(path):
        dicoms = [pydicom.dcmread(f"{path}/{f}") for f in listdir(path)]
        slices = [d for d in dicoms if hasattr(d, "SliceLocation")]
        slices = sorted(slices, key=lambda s: s.SliceLocation)
        
        img_shape = list(slices[0].pixel_array.shape)
        img_shape.append(len(slices))
        img3d = np.zeros(img_shape)
        
        for i, s in enumerate(slices):
            img2d = s.pixel_array
            img3d[:, :, i] = img2d
            
        return {"img3d": img3d, "img_shape": img_shape}
    
    m = dicom_to_tensor(f"{input_dir}")
    np.savetxt(output_file, m['img3d'].reshape((-1,m['img_shape'][2])), delimiter=",")
    return None

dicom_to_matrix_op = comp.func_to_container_op(
    dicom_to_matrix,
    base_image='rawkintrevo/covid-prep-dicom:0.8.0.0')

這段程式碼實作了將DICOM檔案轉換為數值矩陣的過程:

  1. 匯入必要的函式庫:我們在函式內部匯入模組(而非全域),這是Kubeflow中的最佳實踐,確保容器操作正確封裝所有依賴。

  2. dicom_to_tensor內部函式:

    • 讀取指定路徑中的所有DICOM檔案
    • 篩選出具有SliceLocation屬性的切片(這是識別有效CT切片的關鍵)
    • SliceLocation對切片進行排序,確保3D重建的正確順序
    • 建立一個空的3D陣列,其形狀根據第一個切片的尺寸加上切片數量
    • 遍歷每個切片,將2D畫素陣列填入3D張量的對應位置
  3. 主函式流程:

    • 呼叫內部函式處理輸入目錄中的DICOM檔案
    • 使用numpy將3D張量重塑為2D矩陣(每列代表一個切片)
    • 將結果儲存為CSV格式的文字檔案
  4. 最後,將此函式轉換為Kubeflow容器操作,使其可以在工作流程中使用

這種方法的優勢在於它簡潔地處理了複雜的DICOM檔案結構,並將3D醫學影像轉換為可用於後續分析的標準數值格式。

使用Apache Spark和Mahout實作DS-SVD

分散式隨機奇異值分解(DS-SVD)的數學原理相當複雜,這裡我們專注於實作方法。簡單來說,我們希望將CT掃描分解為一組特徵,然後丟棄最不重要的特徵,因為這些很可能是噪音。

Apache Mahout的一個重要特性是其類別似R語言的領域特定語言,這使得用Scala編寫的數學程式碼易於閱讀。以下是使用Spark和Mahout對CT掃描進行分解的過程:

val pathToMatrix = "gs://covid-dicoms/s.csv"
val voxelRDD:DrmRdd[Int] = sc.textFile(pathToMatrix)
  .map(s => dvec( s.split(",")
    .map(f => f.toDouble)))
  .zipWithIndex
  .map(o => (o._2.toInt, o._1))

val voxelDRM = drmWrap(voxelRDD)

// k, p, q 應該都是命令列引數
// k 是輸出的階(rank),例如我們想要的特徵臉數量
// p 是過取樣引數
// q 是額外冪迭代的數量
// 閱讀 https://mahout.apache.org/users/dim-reduction/ssvd.html
val k = args(0).toInt
val p = args(1).toInt
val q = args(2).toInt

val(drmU, drmV, s) = dssvd(voxelDRM.t, k, p, q)

val V = drmV.checkpoint().rdd.saveAsTextFile("gs://covid-dicoms/drmV")
val U = drmU.t.checkpoint().rdd.saveAsTextFile("gs://covid-dicoms/drmU")
sc.parallelize(s.toArray,1).saveAsTextFile("gs://covid-dicoms/s")

這段Scala程式碼實作了使用Apache Mahout進行分散式奇異值分解的過程:

  1. 資料載入:

    • 從Google Cloud Storage讀取之前生成的CSV矩陣
    • 將每行轉換為分散式向量(dvec)
    • 為每行增加索引,然後重新對映為(索引, 向量)對
    • 將RDD包裝為Mahout的分散式行矩陣(DRM)
  2. 引數設定:

    • k:要保留的奇異值數量,決定了降噪後的特徵數
    • p:過取樣引數,增加計算穩定性
    • q:額外冪迭代次數,提高近似精確度
  3. 執行DS-SVD:

    • 對矩陣的轉置執行dssvd操作
    • 得到三個結果矩陣:drmU, drmV, 和s(對角奇異值)
  4. 儲存結果:

    • 將三個矩陣儲存到雲端儲存中,以便後續處理
    • 使用checkpoint()確保RDD計算完成並持久化

這段程式碼的強大之處在於它能夠利用Spark的分散式計算能力處理大型CT掃描資料,而Mahout的DS-SVD演算法則提供了高效的降噪能力。僅用幾行Scala程式碼,我們就實作了一個外核(out-of-core)奇異值分解。

視覺化

R和Python都有許多優秀的視覺化函式庫,我們可以使用其中之一來視覺化我們的降噪DICOM。我們也希望將最終影像儲存到比永續性儲存區容器(PVC)更持久的地方,以便日後檢視。

視覺化階段將包含三個步驟:

  1. 下載DS-SVD產生的DRM
  2. 透過將矩陣s的某些對角值設為零來重組矩陣,從而降噪
  3. 視覺化渲染結果DICOM的切片

雖然R和Python都可以輕鬆完成視覺化,但我選擇使用Python,因為Google官方支援用於與Cloud Storage互動的Python API。

下載分散式行矩陣(DRM)

回想一下,DRM實際上只是RDD的包裝器。在雲端儲存桶中,它將表示為充滿矩陣"部分"的目錄。以下是從Google Cloud Storage下載這些檔案的輔助函式:

def download_folder(bucket_name = 'your-bucket-name',
                   bucket_dir = 'your-bucket-directory/',
                   dl_dir= "local-dir/"):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=bucket_dir)  # 取得檔案列表
    for blob in blobs:
        filename = blob.name.replace('/', '_')
        blob.download_to_filename(dl_dir + filename)  # 下載

這個函式使用Google Cloud Storage客戶端API下載整個目錄的內容:

  1. 初始化儲存客戶端並取得指定的儲存桶
  2. 列出以特定字首開頭的所有檔案(blobs)
  3. 遍歷每個檔案,將其路徑中的斜槓替換為下劃線(避免本地檔案系統問題)
  4. 將每個檔案下載到指定的本地目錄

這種方法特別適合處理Mahout生成的分散式矩陣,因為它們通常被分割成多個部分檔案。

在撰寫本文時,Mahout與Python的整合相當有限(沒有與此程式碼等效的PySpark版本)。同樣,也沒有將Mahout DRM讀入Python NumPy陣列的輔助函式,因此我們需要編寫另一個輔助函式來協助我們:

def read_mahout_drm(path):
    data = {}
    counter = 0
    parts = [p for p in os.listdir(path) if "part"]
    for p in parts:
        with open(f"{path}/{p}", 'r') as f:
            lines = f.read().split("\n")
            for l in lines[:-1]:
                counter +=1
                t = literal_eval(l)
                arr = np.array([t[1][i] for i in range(len(t[1].keys()))])
                data[t[0]] = arr
    print(f"read {counter} lines from {path}")
    return data

這個函式處理Mahout DRM的特殊格式,將其轉換為Python字典:

  1. 初始化一個空字典和計數器
  2. 取得目錄中所有名稱包含"part"的檔案(這是Spark儲存RDD的標準命名方式)
  3. 對於每個部分檔案:
    • 讀取檔案內容並按行分割
    • 對每行(除了最後一行,它可能是空的):
      • 使用literal_eval安全地將文字轉換為Python物件
      • 從評估結果中提取數值陣列
      • 將陣列儲存在字典中,以原始行索引為鍵
  4. 報告讀取的行數並回傳資料字典

這個函式的關鍵在於它處理了Mahout DRM的特殊序列化格式,並將其轉換為Python可用的資料結構。由於大多數Mahout DRM都分散在多個檔案中,我們必須迭代所有部分來重建完整矩陣。

重組矩陣為降噪影像

在奇異值分解中,奇異值對角矩陣通常用希臘字母σ(sigma)表示。在我們的程式碼中,我們使用字母s。按照慣例,這些值通常按照從最重要到最不重要的順序排列,而Mahout的實作也遵循這一慣例。

要降噪影像,我們只需將對角線的最後幾個值設為零。這背後的理念是最不重要的基向量可能代表我們希望去除的噪聲。實際上,這種方法非常有效,因為奇異值分解能夠將資料分解為主要特徵和次要特徵,而噪聲通常表現為低能量的次要特徵。

在處理醫學影像時,這種降噪方法特別有用,因為它保留了關鍵的解剖結構,同時減少了可能幹擾診斷的雜訊。相比於傳統的濾波方法,這種根據SVD的技術能更好地保留影像的精細結構。

完整工作流程整合

將上述所有步驟整合到Kubeflow工作流程中,我們可以建立一個端對端的CT掃描降噪管線。這種方法的優勢在於:

  1. 可重複性:整個流程被封裝在容器中,確保在任何環境中都能一致執行
  2. 可擴充套件性:可以輕鬆處理多個CT掃描,利用Spark的分散式計算能力
  3. 模組化:每個步驟都是獨立的,可以單獨改進或替換
  4. 開放原始碼:所有元件都是開放原始碼的,使研究人員能夠自由修改和擴充套件

這種方法特別適合在疫情期間協助醫學研究人員處理大量CT掃描資料,提高診斷效率和準確性。

實際應用與擴充套件

這個CT掃描降噪管線可以擴充套件用於多種應用場景:

臨床診斷輔助

降噪後的CT掃描可以幫助放射科醫生更清晰地識別肺部感染的早期跡象。在COVID-19診斷中,這一點尤為重要,因為早期感染可能表現為微妙的磨玻璃影,容易被噪聲掩蓋。

機器學習前處理

對於構建自動診斷系統,高品質的輸入資料至關重要。這個降噪管線可以作為機器學習管線的前處理步驟,提高後續分類別或分割模型的效能。

研究資料分享

標準化的降噪流程使不同來源的CT掃描資料更具可比性,有助於研究人員在全球範圍內分享和比較結果。

效能最佳化考量

在處理大量CT掃描時,以下幾點效能最佳化值得考慮:

  1. 調整Spark引數:根據叢集大小和可用資源調整Spark執行器數量和記憶體分配
  2. SVD引數選擇:k, p, q引數的選擇對計算效率和結果品質有顯著影響
  3. 儲存最佳化:考慮使用壓縮格式儲存中間結果,特別是當處理大量掃描時
  4. 批處理策略:開發批處理策略以有效處理多個CT掃描

這個開放原始碼CT掃描降噪管線還有多個可能的發展方向:

  1. 自適應降噪:開發自動確定最佳奇異值截斷點的演算法
  2. 多模態整合:擴充套件管線以處理CT、MRI等多種醫學影像格式
  3. 實時處理:最佳化演算法以支援近實時的CT掃描降噪
  4. 分割整合:將降噪步驟與自動分割演算法整合,直接識別感染區域

透過使用開放原始碼工具構建這個CT掃描降噪管線,我希望能為醫學影像處理領域提供一個可擴充套件、可重用的解決方案。這種方法不僅適用於當前的疫情應對,也可以應用於更廣泛的醫學影像處理任務。

透過結合Python的易用性、Spark的分散式計算能力和Mahout的高階數學函式庫,我們能夠建立一個既強大又靈活的資料處理管線,為醫學影像分析提供新的可能性。

CT掃描降噪:從數學原理到實際應用

醫學影像處理一直是人工智慧與醫療結合的重要領域。在醫學影像中,CT掃描是診斷的重要工具,但掃描過程中不可避免會產生雜訊。這些雜訊可能影響醫生的診斷判斷,因此降噪處理便成為提升診斷準確率的關鍵步驟。

在這篇文章中,玄貓將帶領各位實作一個完整的CT掃描降噪管道,從矩陣操作到雲端佈署,一步解析每個環節的技術細節。

降噪的數學基礎:奇異值分解

在深入技術細節前,先來理解CT掃描降噪的數學基礎。我們採用的核心技術是奇異值分解(Singular Value Decomposition, SVD),這是一種強大的矩陣分解方法。

SVD的基本原理是將任何矩陣 A 分解為三個矩陣的乘積:

A = U * Σ * V^T

其中:

  • U 是一個正交矩陣,包含左奇異向量
  • Σ 是一個對角矩陣,包含奇異值
  • V^T 是另一個正交矩陣的轉置,包含右奇異向量

在影像處理中,我們可以將CT掃描視為一個大型矩陣。透過SVD分解後,奇異值從大到小排列,代表了不同頻率的訊息。較大的奇異值通常對應影像的主要結構,而較小的奇異值往往對應噪聲。因此,透過將小奇異值設為零,然後重建矩陣,我們可以有效地降低噪聲。

多張影像處理的迴圈實作

首先,讓我們看一段用於處理多個降噪級別影像的程式碼:

percs = [0.001, 0.01, 0.05, 0.1, 0.3]
for p in range(len(percs)):
    perc = percs[p]
    diags = [diags_orig[i]
             if i < round(len(diags) - (len(diags) * perc))
             else 0
             for i in range(len(diags))]
    recon = drmU_p5 @ np.diag(diags) @ drmV_p5.transpose()
    composite_img = recon.transpose().reshape((512,512,301))
    a1 = plt.subplot(1,1,1)
    plt.imshow(composite_img[:, :, 150], cmap=plt.cm.bone)
    plt.title(f"{perc*100}% denoised. (k={len(diags)}, oversample=15, power_iters=2)")
    a1.set_aspect(1.0)
    plt.axis('off')
    fname = f"{100-(perc*100)}%-denoised-img.png"
    plt.savefig(f"/tmp/{fname}")
    upload_blob(bucket_name, f"/tmp/{fname}", f"/output/{fname}")

這段程式碼是降噪處理的核心,它透過迴圈產生不同程度的降噪效果:

  1. percs陣列定義了我們要嘗試的降噪百分比,從0.1%到30%不等。
  2. 迴圈中,我們根據每個百分比值,將一定比例的最小奇異值設為零:
    • if i < round(len(diags) - (len(diags) * perc)) 這一行確保只有最後p%的奇異值被設為零
  3. 使用矩陣乘法運算元@重建影像矩陣:recon = drmU_p5 @ np.diag(diags) @ drmV_p5.transpose()
  4. 重建後的矩陣被重塑為原始CT掃描的形狀(512,512,301),其中301是切片數量
  5. 我們選取第150個切片進行視覺化,並使用骨骼色彩對映(plt.cm.bone)來顯示
  6. 最後將處理後的影像儲存為PNG檔案,並上載到雲端儲存桶

這種漸進式的降噪處理讓醫療專業人員可以比較不同降噪程度的效果,選擇最佳的平衡點。在實務中,我發現0.5%到5%的降噪通常能提供最佳結果,保留診斷所需的細節同時去除大部分噪聲。

建立CT掃描降噪管道

完整的CT掃描降噪流程需要多個步驟協同工作。在雲端環境中,我們可以使用Kubeflow建立一個端對端的處理管道。下面讓我們逐步解析這個管道的建立過程。

Spark操作清單

首先,我們需要定義Spark任務的清單(manifest)。由於矩陣分解運算量大,使用Spark可以有效地分散計算負載:

container_manifest = {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "kind": "SparkApplication",
    "metadata": {
        "name": "spark-app",
        "namespace": "kubeflow"
    },
    "spec": {
        "type": "Scala",
        "mode": "cluster",
        "image": "docker.io/rawkintrevo/covid-basis-vectors:0.2.0",
        "imagePullPolicy": "Always",
        "hadoopConf": {
            "fs.gs.project.id": "kubeflow-hacky-hacky",
            "fs.gs.system.bucket": "covid-dicoms",
            "fs.gs.impl" : "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
            "google.cloud.auth.service.account.enable": "true",
            "google.cloud.auth.service.account.json.keyfile": "/mnt/secrets/user-gcp-sa.json",
        },
        "mainClass": "org.rawkintrevo.covid.App",
        "mainApplicationFile": "local:///covid-0.1-jar-with-dependencies.jar",
        "arguments": ["245", "15", "1"],
        "sparkVersion": "2.4.5",
        "restartPolicy": {
            "type": "Never"
        },
        "driver": {
            "cores": 1,
            "secrets": [
                {"name": "user-gcp-sa",
                 "path": "/mnt/secrets",
                 "secretType": "GCPServiceAccount"
                }
            ],
            "coreLimit": "1200m",
            "memory": "512m",
            "labels": {
                "version": "2.4.5",
            },
            "serviceAccount": "spark-operatoroperator-sa",
        },
        "executor": {
            "cores": 1,
            "secrets": [
                {"name": "user-gcp-sa",
                 "path": "/mnt/secrets",
                 "secretType": "GCPServiceAccount"
                }
            ],
            "instances": 4,
            "memory": "4084m"
        },
        "labels": {
            "version": "2.4.5"
        },
    }
}

這個Spark清單定義瞭如何在Kubernetes叢集中執行SVD運算:

  1. 基本設定

    • 使用Scala作為執行語言
    • 以cluster模式執行,確保計算分佈在多個節點
    • 指定使用的Docker映像檔及版本
  2. 雲端儲存設定

    • 設定Google Cloud Storage連線引數
    • 指定認證方式及金鑰檔案位置
  3. 執行引數

    • 主類別為org.rawkintrevo.covid.App
    • 傳入引數["245", "15", "1"],分別代表:
      • 245: 使用的特徵數量
      • 15: 過取樣率
      • 1: 冪次迭代次數
  4. 資源設定

    • 驅動程式:1個核心,512MB記憶體
    • 執行程式:4個執行器,每個1個核心和4084MB記憶體

在實務中,我常根據資料大小調整這些引數。例如,處理高解析度CT掃描時,可能需要增加執行器數量至8或更多,以加速計算過程。

值得注意的是,這個設定是針對Google Cloud Platform(GCP)的,但只需修改少量設定,特別是關於金鑰的部分,即可輕鬆移植到AWS或Azure平台。

完整的降噪管道定義

有了Spark任務清單,接下來我們可以定義完整的處理管道:

from kfp.gcp import use_gcp_secret

@kfp.dsl.pipeline(
    name="Covid DICOM Pipe v2",
    description="Visualize Denoised CT Scans"
)
def covid_dicom_pipeline():
    vop = kfp.dsl.VolumeOp(
        name="requisition-PVC",
        resource_name="datapvc",
        size="20Gi",
        modes=kfp.dsl.VOLUME_MODE_RWO
    )
    
    step1 = kfp.dsl.ContainerOp(
        name="download-dicom",
        image="rawkintrevo/download-dicom:0.0.0.4",
        command=["/run.sh"],
        pvolumes={"/data": vop.volume}
    )
    
    step2 = kfp.dsl.ContainerOp(
        name="convert-dicoms-to-vectors",
        image="rawkintrevo/covid-prep-dicom:0.9.5",
        arguments=[
            '--bucket_name', "covid-dicoms",
        ],
        command=["python", "/program.py"],
        pvolumes={"/mnt/data": step1.pvolume}
    ).apply(kfp.gcp.use_gcp_secret(secret_name='user-gcp-sa'))
    
    rop = kfp.dsl.ResourceOp(
        name="calculate-basis-vectors",
        k8s_resource=container_manifest,
        action="create",
        success_condition="status.applicationState.state == COMPLETED"
    ).after(step2)
    
    pyviz = kfp.dsl.ContainerOp(
        name="visualize-slice-of-dicom",
        image="rawkintrevo/visualize-dicom-output:0.0.11",
        command=["python", "/program.py"],
        arguments=[
            '--bucket_name', "covid-dicoms",
        ],
    ).apply(kfp.gcp.use_gcp_secret(secret_name='user-gcp-sa')).after(rop)
    
kfp.compiler.Compiler().compile(covid_dicom_pipeline,"dicom-pipeline-2.zip")
client = kfp.Client()
my_experiment = client.create_experiment(name='my-experiments')
my_run = client.run_pipeline(my_experiment.id, 'my-run1', 'dicom-pipeline-2.zip')

這段程式碼定義了一個完整的Kubeflow管道,包含四個主要步驟:

  1. 資料準備

    • 首先建立一個永續性儲存區(PVC)用於儲存CT掃描資料
    • 容量設為20GB,確保足夠空間處理多個DICOM檔案
  2. 資料下載

    • 使用自訂容器從GCP儲存桶下載DICOM檔案
    • 將檔案儲存到先前建立的永續性儲存區中
  3. 資料轉換

    • 將DICOM檔案轉換為矩陣格式
    • 上載轉換後的矩陣到指定的GCP儲存桶
    • 使用GCP金鑰進行身份驗證
  4. SVD計算

    • 使用前面定義的Spark清單建立ResourceOp
    • 執行分散式奇異值分解
    • 設定成功條件確保任務完成才進行下一步
  5. 視覺化處理

    • 最後一步將處理後的矩陣重建為影像
    • 產生多個不同降噪程度的影像切片
    • 將結果上載回GCP儲存桶

最後三行程式碼將管道編譯為zip檔案,並使用Kubeflow客戶端建立實驗並執行管道。

在實際佈署中,我通常會將這個管道設定為定時執行,處理新上載的CT掃描。這種自動化流程大幅提高了醫學影像處理的效率,讓醫師能夠更快獲得高品質的診斷影像。