1. 简介
自然语言处理(Natural Language Processing, NLP)是一门涵盖人工智能、计算机科学、语言学等多领域的交叉学科,目的是让机器能够读懂、理解、生成和处理人类自然语言的智能技术。NLP的应用非常广泛,如语音识别、机器翻译、文本分类、文本生成等等。其中机器翻译是NLP的重要应用之一,下面我们就来介绍一下Python中的机器翻译实例。
2. 机器翻译的概念
2.1 机器翻译的定义
机器翻译(Machine Translation, MT)是利用计算机程序将一种自然语言翻译成另一种自然语言的技术。它的目标是消除语言障碍,使得人们能够更加自由地交流。
2.2 机器翻译的发展历程
机器翻译的发展历程可以分为以下三个阶段:
基于规则的翻译(Rule-Based Machine Translation, RBMT):早期的机器翻译系统采用基于规则的方法,即利用专家的知识和规则来进行翻译。这种方法需要大量的领域知识和专业知识,并且规则的编写工作量大,效果较差。
基于统计的机器翻译(Statistical Machine Translation, SMT):随着计算机硬件的不断提升和语料库的增加,统计机器翻译逐渐兴起。这种方法利用大量的语料库训练模型,从而实现翻译。它的优点是可以自动学习翻译规则,但是需要大量的语料库,并且结果不够精确。
基于神经网络的机器翻译(Neural Machine Translation, NMT):基于神经网络的机器翻译是近年来发展的新兴领域,它利用深度学习算法,自动捕捉翻译的语义信息,从而实现更加准确的翻译。
3. Python中的机器翻译实例
3.1 Google的翻译API
Google提供了一组用于机器翻译的API,可以实现多种语言之间的翻译。使用它需要申请API密钥,并且需要联网。具体代码如下:
import googletrans
from googletrans import Translator
translator = Translator(service_urls=['translate.google.cn'])
def translate(text, dest='en', src='auto'):
result = translator.translate(text, dest, src).text
return result
text = '机器学习是一门计算机科学'
result = translate(text)
print(result)
注意:该方法需要联网,故需要保证网络连接正常。
3.2 PyTorch实现的神经机器翻译模型
基于神经网络的机器翻译是目前最流行的方法之一。下面我们将介绍如何使用PyTorch实现一个简单的神经翻译模型。
1. 安装必要的库
!pip install torch torchtext spacy
!python -m spacy download en
!python -m spacy download de
2. 导入必要的库
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator
import spacy
import numpy as np
import random
import math
import time
3. 定义模型
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
super().__init__()
self.hid_dim = hid_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(input_dim, emb_dim)
self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
# src = [src_len, batch_size]
embedded = self.dropout(self.embedding(src))
# embedded = [src_len, batch_size, emb_dim]
outputs, (hidden, cell) = self.rnn(embedded)
# outputs = [src_len, batch_size, hid_dim * n_directions]
# hidden = [n_layers * n_directions, batch_size, hid_dim]
# cell = [n_layers * n_directions, batch_size, hid_dim]
return hidden, cell
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
super().__init__()
self.output_dim = output_dim
self.hid_dim = hid_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.LSTM(emb_dim + hid_dim, hid_dim, n_layers, dropout=dropout)
self.fc_out = nn.Linear(emb_dim + hid_dim * 2, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, cell, context):
# input = [batch_size]
# hidden = [n_layers * n_directions, batch_size, hid_dim]
# cell = [n_layers * n_directions, batch_size, hid_dim]
# context = [n_layers * n_directions, batch_size, hid_dim]
input = input.unsqueeze(0)
# input = [1, batch_size]
embedded = self.dropout(self.embedding(input))
# embedded = [1, batch_size, emb_dim]
emb_con = torch.cat((embedded, context), dim=2)
# emb_con = [1, batch_size, emb_dim + hid_dim]
output, (hidden, cell) = self.rnn(emb_con, (hidden, cell))
# outputs = [seq_len, batch_size, hid_dim * n_directions]
# hidden = [n_layers * n_directions, batch_size, hid_dim]
# cell = [n_layers * n_directions, batch_size, hid_dim]
output = torch.cat((embedded.squeeze(0), hidden.squeeze(0), context.squeeze(0)), dim=1)
# output = [batch_size, emb_dim + hid_dim * 2]
prediction = self.fc_out(output)
# prediction = [batch_size, output_dim]
return prediction, hidden, cell
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
# src = [src_len, batch_size]
# trg = [trg_len, batch_size]
# teacher_forcing_ratio 是 teacher forcing 使用的比例
batch_size = trg.shape[1]
trg_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
hidden, cell = self.encoder(src)
# 预测的第一个输入是 标记
input = trg[0, :]
context = torch.zeros(1, batch_size, self.decoder.hid_dim).to(self.device)
for t in range(1, trg_len):
output, hidden, cell = self.decoder(input, hidden, cell, context)
outputs[t] = output
teacher_force = random.random() < teacher_forcing_ratio
top1 = output.argmax(1)
input = trg[t] if teacher_force else top1
context = hidden.unsqueeze(0)
return outputs
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)
model = Seq2Seq(enc, dec, device).to(device)
4. 训练模型
optimizer = optim.Adam(model.parameters())
PAD_IDX = TRG.vocab.stoi['']
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
def train(model, iterator, optimizer, criterion, clip):
model.train()
epoch_loss = 0
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
optimizer.zero_grad()
output = model(src, trg)
# output 的尺寸是 [trg_len, batch_size, output_dim]
# 把 output 的第一维和第二维拼起来,变成二维张量 [trg_len * batch_size, output_dim]
# 把目标词也做成一维张量
# 这样做的好处是方便计算损失,同时也适应了 nn.CrossEntropyLoss 的输入格式要求
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
trg = trg[1:].view(-1)
loss = criterion(output, trg)
loss.backward()
# 梯度修剪,防止梯度爆炸
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size=BATCH_SIZE,
device=device)
N_EPOCHS = 10
CLIP = 1
train_loss_list = []
valid_loss_list = []
for epoch in range(N_EPOCHS):
start_time = time.time()
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
valid_loss = evaluate(model, valid_iterator, criterion)
train_loss_list.append(train_loss)
valid_loss_list.append(valid_loss)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
3.3 实验结果
下面我们对模型进行测试,并输出一些例子:
def translate_sentence(sentence, src_field, trg_field, model, device, max_len=50):
model.eval()
if isinstance(sentence, str):
nlp = spacy.load('de_core_news_sm')
tokens = [token.text.lower() for token in nlp(sentence)]
else:
tokens = [token.lower() for token in sentence]
tokens = [src_field.init_token] + tokens + [src_field.eos_token]
src_indexes = [src_field.vocab.stoi[token] for token in tokens]
src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)
with torch.no_grad():
hidden, cell = model.encoder(src_tensor)
trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]
for i in range(max_len):
trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
with torch.no_grad():
output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
pred_token = output.argmax(1).item()
trg_indexes.append(pred_token)
if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
break
trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]
return trg_tokens[1:], hidden, cell
def translate_sentence_beam_search(sentence, src_field, trg_field, model, device, beam_size=5, max_len=50):
model.eval()
if isinstance(sentence, str):
nlp = spacy.load('de_core_news_sm')
tokens = [token.text.lower() for token in nlp(sentence)]
else:
tokens = [token.lower() for token in sentence]
tokens = [src_field.init_token] + tokens + [src_field.eos_token]
src_indexes = [src_field.vocab.stoi[token] for token in tokens]
src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)
with torch.no_grad():
hidden, cell = model.encoder(src_tensor)
trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]
beams = [{'tokens': trg_indexes, 'hidden': hidden, 'cell': cell, 'score': 0}]
for i in range(max_len):
new_beams = []
end_flag = True
for beam in beams:
if beam['tokens'][-1] == trg_field.vocab.stoi[trg_field.eos_token]:
new_beams.append(beam)
continue
end_flag = False
trg_tensor = torch.LongTensor([beam['tokens'][-1]]).to(device)
output, hidden, cell = model.decoder(trg_tensor, beam['hidden'], beam['cell'])
log_prob, indices = torch.topk(torch.log_softmax(output, dim=-1), beam_size)
candidates = [{'tokens': beam['tokens'] + [index.item()], 'hidden': hidden, 'cell': cell, 'score': beam['score'] + log_prob[0][index]} for index in indices[0]]
new_beams += candidates
if end_flag:
break
beams = sorted(new_beams, key=lambda x: x['score'], reverse=True)[:beam_size]
trg_tokens = [trg_field.vocab.itos[i] for i in beams[0]['tokens']]
return trg_tokens[1:], beams[0]['hidden'], beams[0]['cell']
def translate(text, model, src_field, trg_field, method='greedy', beam_size=5, max_len=50):
if method == 'greedy':
translation, _, _ = translate_sentence(text, src_field, trg_field, model, device, max_len)
elif method == 'beam_search':
translation, _, _ = translate_sentence_beam_search(text, src_field, trg_field, model, device, beam_size, max_len)
else:
raise NotImplementedError
translation = ' '.join(translation)
return translation
def test_translation(model, method='greedy', beam_size=5, max_len=50):
for i, batch in enumerate(test_iterator):
src = batch.src
trg = batch.trg
for j in range(src.shape[1]):
text = ' '.join([SRC.vocab.itos[src[i][j].item()] for i in range(src.shape[0])])
pred = translate(text, model, SRC, TRG, method, beam_size, max_len)
trg_text = ' '.join([TRG.vocab.itos[trg[i][j].item()] for i in range(1, trg.shape[0])])
if j == 0:
print(f'SRC: {text}')
print(f'TRG: {trg_text}')
print(f'PRED_{method.upper()}{j}: {pred}')
if i >= 1: # only print 2 examples
break
我们使用测试集的