良好的專案結構對於程式碼維護和擴充性至關重要,本文將會詳細介紹如何有效組織程式碼、組態 Docker 環境,以及撰寫測試案例。同時也將探討如何使用 CSV 檔案和 Django ORM 實作 Repository 和 Unit of Work 模式,確保資料的持久化和一致性。最後,將會介紹如何在 Django ORM 類別上實作自定義方法,以在領域模型和 Django ORM 物件之間進行轉換,簡化程式碼邏輯並提升程式碼可讀性。這能確保專案結構清晰,程式碼易於維護,並能有效地整合資料函式庫操作。
專案結構與組態
在開發過程中,專案的結構與組態對於維護性和可擴充套件性至關重要。本文將詳細介紹如何組織原始碼、組態Docker環境、以及撰寫測試。
原始碼組織
將原始碼置於src資料夾下,並利用setup.py使其可透過pip安裝。以下為一個基本的src資料夾結構:
src/
├── allocation
│ ├── config.py
│ └── ...
└── setup.py
其中,allocation資料夾定義了一個頂層模組,而setup.py則用於使專案可被安裝。
setup.py範例
from setuptools import setup
setup(
name='allocation',
version='0.1',
packages=['allocation'],
)
Dockerfile組態
Dockerfile用於定義Docker映像的構建過程。以下是一個範例Dockerfile,用於構建一個Python應用程式的映像:
FROM python:3.8-alpine
# 安裝系統級依賴
RUN apk add --no-cache --virtual .build-deps gcc postgresql-dev musl-dev python3-dev
RUN apk add libpq
# 安裝Python依賴
COPY requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt
# 清理構建依賴
RUN apk del --no-cache .build-deps
# 複製原始碼並安裝
RUN mkdir -p /src
COPY src/ /src/
RUN pip install -e /src
# 組態環境變數與啟動命令
WORKDIR /src
ENV FLASK_APP=allocation/entrypoints/flask_app.py FLASK_DEBUG=1 PYTHONUNBUFFERED=1
CMD flask run --host=0.0.0.0 --port=80
Dockerfile內容解密:
- 基礎映像選擇:使用
python:3.8-alpine作為基礎映像,以減小映像大小。 - 安裝系統級依賴:安裝必要的編譯工具和函式庫,如
gcc、postgresql-dev等。 - 安裝Python依賴:複製
requirements.txt並安裝所需的Python套件。 - 清理構建依賴:刪除不再需要的構建依賴,以減小映像大小。
- 複製原始碼並安裝:將原始碼複製到映像中,並以可編輯模式安裝。
- 組態環境變數與啟動命令:設定必要的環境變數,並指定容器啟動時執行的命令。
測試組織
測試程式碼應與原始碼分開組織,通常放在tests資料夾下。以下是一個範例結構:
tests/
├── conftest.py
├── e2e
│ └── test_api.py
├── integration
│ ├── test_orm.py
│ └── test_repository.py
├── pytest.ini
└── unit
├── test_allocate.py
├── test_batches.py
└── test_services.py
測試內容解密:
- 分層測試:將測試分為單元測試(unit)、整合測試(integration)和端對端測試(e2e),以便於管理和執行不同型別的測試。
- 共用組態和fixtures:使用
conftest.py來定義共用的fixtures和組態。
結語
本文介紹瞭如何組織原始碼、組態Docker環境以及撰寫測試。透過遵循這些最佳實踐,可以提高專案的可維護性和可擴充套件性。
使用CSV實作Repository和Unit of Work模式
在前面的章節中,我們已經瞭解瞭如何使用SQLAlchemy和Flask來實作Repository和Unit of Work模式。然而,如果我們想要使用CSV檔案來儲存資料,而不是資料函式庫,該怎麼辦呢?本章節將介紹如何使用CSV實作Repository和Unit of Work模式。
CSV Repository的實作
首先,我們需要實作一個CSV Repository,它將負責從CSV檔案中讀取資料。以下是一個可能的實作:
class CsvRepository(repository.AbstractRepository):
def __init__(self, folder):
self._batches_path = Path(folder) / 'batches.csv'
self._allocations_path = Path(folder) / 'allocations.csv'
self._batches = {} # type: Dict[str, model.Batch]
self._load()
def get(self, reference):
return self._batches.get(reference)
def add(self, batch):
self._batches[batch.reference] = batch
def _load(self):
with self._batches_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
ref, sku = row['ref'], row['sku']
qty = int(row['qty'])
if row['eta']:
eta = datetime.strptime(row['eta'], '%Y-%m-%d').date()
else:
eta = None
self._batches[ref] = model.Batch(ref=ref, sku=sku, qty=qty, eta=eta)
if self._allocations_path.exists() is False:
return
with self._allocations_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
batchref, orderid, sku = row['batchref'], row['orderid'], row['sku']
qty = int(row['qty'])
line = model.OrderLine(orderid, sku, qty)
batch = self._batches[batchref]
batch._allocations.add(line)
def list(self):
return list(self._batches.values())
內容解密:
CsvRepository類別繼承自AbstractRepository,並實作了get、add和list方法。- 在
_load方法中,我們從batches.csv檔案中讀取資料,並建立Batch物件。 - 如果
allocations.csv檔案存在,我們也會從中讀取資料,並建立OrderLine物件,將其加入到對應的Batch物件中。
CSV Unit of Work的實作
接下來,我們需要實作一個CSV Unit of Work,它將負責將變更寫入到CSV檔案中。以下是一個可能的實作:
class CsvUnitOfWork(unit_of_work.AbstractUnitOfWork):
def __init__(self, folder):
self.batches = CsvRepository(folder)
def commit(self):
with self.batches._allocations_path.open('w') as f:
writer = csv.writer(f)
writer.writerow(['orderid', 'sku', 'qty', 'batchref'])
for batch in self.batches.list():
for line in batch._allocations:
writer.writerow([line.orderid, line.sku, line.qty, batch.reference])
def rollback(self):
pass
內容解密:
CsvUnitOfWork類別繼承自AbstractUnitOfWork,並實作了commit和rollback方法。- 在
commit方法中,我們將變更寫入到allocations.csv檔案中。 - 由於我們使用的是CSV檔案,因此我們不需要實作
rollback方法。
使用CSV Repository和Unit of Work
現在,我們可以使用CSV Repository和Unit of Work來實作我們的業務邏輯。以下是一個可能的實作:
def main(folder):
orders_path = Path(folder) / 'orders.csv'
uow = csv_uow.CsvUnitOfWork(folder)
with orders_path.open() as f:
reader = csv.DictReader(f)
for row in reader:
orderid, sku = row['orderid'], row['sku']
qty = int(row['qty'])
services.allocate(orderid, sku, qty, uow)
內容解密:
- 我們建立了一個
CsvUnitOfWork物件,並將其傳遞給services.allocate方法。 - 在
services.allocate方法中,我們使用CsvRepository來讀取和寫入資料。
在Django ORM類別上實作自定義方法以轉換至/自領域模型
在前面的章節中,我們探討瞭如何使用SQLAlchemy來實作領域模型與資料函式庫之間的對映。在本文中,我們將介紹如何在Django中實作類別似的功能。
Django ORM自定義方法
首先,我們需要在Django ORM類別上實作自定義方法,以便將領域模型轉換為Django ORM物件,反之亦然。以下是一個範例:
# src/djangoproject/alloc/models.py
from django.db import models
from allocation.domain import model as domain_model
class Batch(models.Model):
reference = models.CharField(max_length=255)
sku = models.CharField(max_length=255)
qty = models.IntegerField()
eta = models.DateField(blank=True, null=True)
@staticmethod
def update_from_domain(batch: domain_model.Batch):
try:
b = Batch.objects.get(reference=batch.reference)
except Batch.DoesNotExist:
b = Batch(reference=batch.reference)
b.sku = batch.sku
b.qty = batch._purchased_quantity
b.eta = batch.eta
b.save()
b.allocation_set.set(
Allocation.from_domain(l, b)
for l in batch._allocations
)
def to_domain(self) -> domain_model.Batch:
b = domain_model.Batch(
ref=self.reference, sku=self.sku, qty=self.qty, eta=self.eta
)
b._allocations = set(
a.line.to_domain()
for a in self.allocation_set.all()
)
return b
內容解密:
update_from_domain方法用於將領域模型中的Batch物件轉換為Django ORM中的Batch物件。如果資料函式庫中不存在對應的Batch物件,則會建立一個新的物件。to_domain方法用於將Django ORM中的Batch物件轉換為領域模型中的Batch物件。- 在
update_from_domain方法中,我們使用了try-except區塊來處理Batch物件不存在的情況。 - 在
to_domain方法中,我們將Allocation物件轉換為領域模型中的OrderLine物件。
工作單元(Unit of Work)模式與Django
接下來,我們需要實作工作單元模式,以便在Django中管理資料函式庫交易。以下是一個範例:
# src/allocation/service_layer/unit_of_work.py
class DjangoUnitOfWork(AbstractUnitOfWork):
def __enter__(self):
self.batches = repository.DjangoRepository()
transaction.set_autocommit(False)
return super().__enter__()
def __exit__(self, *args):
super().__exit__(*args)
transaction.set_autocommit(True)
def commit(self):
for batch in self.batches.seen:
self.batches.update(batch)
transaction.commit()
def rollback(self):
transaction.rollback()
內容解密:
DjangoUnitOfWork類別繼承自AbstractUnitOfWork,並實作了__enter__、__exit__、commit和rollback方法。- 在
__enter__方法中,我們設定了資料函式庫交易的自動提交為False,以便開始一個新的交易。 - 在
commit方法中,我們將所有變更提交到資料函式庫。 - 在
rollback方法中,我們回復所有變更。
Django檢視作為介面卡
最後,我們需要實作Django檢視,以便將HTTP請求轉換為服務層的呼叫。以下是一個範例:
# src/djangoproject/alloc/views.py
@csrf_exempt
def add_batch(request):
data = json.loads(request.body)
eta = data['eta']
if eta is not None:
eta = datetime.fromisoformat(eta).date()
services.add_batch(
data['ref'], data['sku'], data['qty'], eta,
unit_of_work.DjangoUnitOfWork(),
)
return HttpResponse('OK', status=201)
@csrf_exempt
def allocate(request):
data = json.loads(request.body)
try:
batchref = services.allocate(
data['orderid'],
data['sku'],
data['qty'],
unit_of_work.DjangoUnitOfWork(),
)
except (model.OutOfStock, services.InvalidSku) as e:
return JsonResponse({'message': str(e)}, status=400)
return JsonResponse({'batchref': batchref}, status=201)
內容解密:
add_batch和allocate檢視函式用於處理HTTP請求,並將請求資料轉換為服務層的呼叫。- 在
add_batch檢視函式中,我們呼叫了services.add_batch方法,以建立一個新的批次。 - 在
allocate檢視函式中,我們呼叫了services.allocate方法,以分配訂單行到批次中。