-
Notifications
You must be signed in to change notification settings - Fork 216
Expand file tree
/
Copy pathtranslate.py
More file actions
234 lines (198 loc) · 9.25 KB
/
translate.py
File metadata and controls
234 lines (198 loc) · 9.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import re
import time
import json
import os
from openai import OpenAI
from googletrans import Translator
from langdetect import detect, DetectorFactory
# 设置种子以确保结果一致
DetectorFactory.seed = 0
def load_translation_cache(cache_file):
if os.path.exists(cache_file):
with open(cache_file, 'r', encoding='utf-8') as f:
translations = json.load(f)
# 将缓存中的所有翻译结果首字母转为小写并去除末尾标点
for key in translations:
translations[key] = clean_translation(translations[key])
return translations
return {}
def save_translation_cache(cache_file, translations):
# 在保存缓存前确保所有翻译结果首字母转为小写并去除末尾标点
for key in translations:
translations[key] = clean_translation(translations[key])
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(translations, f, ensure_ascii=False, indent=2)
def get_version(version_file):
if os.path.exists(version_file):
with open(version_file, 'r', encoding='utf-8') as f:
return f.read().strip()
return None
def update_version(version_file):
timestamp = str(int(time.time()))
with open(version_file, 'w', encoding='utf-8') as f:
f.write(timestamp)
return timestamp
def contains_chinese(text):
return any('\u4e00' <= char <= '\u9fff' for char in text)
# def contains_target_language_characters(text, target_lang):
# try:
# detected_lang = detect(text)
# return detected_lang == target_lang
# except Exception as e:
# print(f"Language detection failed: {e}")
# return False
def translate_text_qwen_mt(text, target_lang):
api_key = os.getenv("AI_API_KEY")
if not api_key:
raise ValueError("AI_API_KEY environment variable is not set.")
client = OpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
messages = [
{'role': 'user', 'content': text}
]
translation_options = {
"source_lang": "zh", # 指定源语言为中文
"target_lang": target_lang
}
try:
completion = client.chat.completions.create(
model="qwen-mt-plus",
messages=messages,
extra_body={
"translation_options": translation_options
}
)
translated_text = completion.choices[0].message.content
return translated_text.capitalize().lower().rstrip('.,!?;:')
except Exception as e:
print(f"Qwen-MT-Plus translation failed: {e}")
return ""
def translate_text_google(text, target_lang):
try:
translator = Translator(service_urls=['translate.google.com'])
translation = translator.translate(text, src='zh-cn', dest=target_lang) # 指定源语言为中文
translated_text = translation.text
return translated_text.capitalize().lower().rstrip('.,!?;:')
except Exception as e:
print(f"Google Translate failed: {e}")
return ""
def needs_fallback_translation(translated_text):
return '\n' in translated_text or '"' in translated_text
def clean_translation(text):
return text.replace('\n', '').replace('"', '')
def translate_po_file(input_file, output_file, target_lang_code, target_lang_name):
# 获取目标语言目录
lang_dir = os.path.dirname(output_file)
# 构建 LC_MESSAGES 目录路径
lc_messages_dir = os.path.join('languages', target_lang_code, 'LC_MESSAGES')
# 确保 LC_MESSAGES 目录存在
if not os.path.exists(lc_messages_dir):
os.makedirs(lc_messages_dir)
# 构建缓存文件和版本文件的路径
cache_file = os.path.join(lang_dir, f'cache_{target_lang_name}.json')
version_file = os.path.join(lc_messages_dir, 'version')
translations = load_translation_cache(cache_file)
current_version = get_version(version_file)
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
# 匹配msgid和空msgstr
pattern = r'msgid "(.+?)"\s*\nmsgstr "(.*?)"'
matches = re.finditer(pattern, content)
updated = False
used_translations = set() # 用于跟踪已使用的翻译
for match in matches:
msgid_text = match.group(1)
# 检查缓存
if msgid_text in translations:
translated_text = translations[msgid_text]
# 清理缓存中的翻译结果
translated_text = clean_translation(translated_text)
# 直接使用缓存的翻译,不再检查目标语言
if translated_text == "":
updated = True
print(f"Cached translation is empty for: {msgid_text}. Re-translating...")
else:
print(f"Using cached translation: {msgid_text} -> {translated_text}")
# 更新content以反映翻译结果
content = re.sub(
rf'msgid "{re.escape(msgid_text)}"\s*\nmsgstr ".*?"',
rf'msgid "{msgid_text}"\nmsgstr "{translated_text}"',
content
)
used_translations.add(msgid_text) # 标记为已使用
continue # 跳过翻译步骤
# 进行翻译
try:
# 增加重试机制
max_retries = 3
for attempt in range(max_retries):
try:
time.sleep(0.1) # 增加延迟以避免请求过快
translated_text = translate_text_qwen_mt(msgid_text, target_lang_code)
# 检查翻译结果是否仍包含中文或需要回退翻译
if (contains_chinese(translated_text) or
needs_fallback_translation(translated_text)):
print(f"Translation does not meet criteria using Qwen-MT-Plus. Using Google Translate...")
translated_text = translate_text_google(msgid_text, target_lang_code)
# 清理Google翻译结果
translated_text = clean_translation(translated_text)
# 检查翻译是否有变更
if msgid_text in translations and translations[msgid_text] != translated_text:
print(f"Translation changed for: {msgid_text} -> {translated_text}")
updated = True
# 更新缓存
translations[msgid_text] = translated_text # 存储翻译到缓存
print(f"New translation [{target_lang_code}]: {msgid_text} -> {translated_text}")
used_translations.add(msgid_text) # 标记为已使用
break # 成功翻译后跳出重试循环
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"Retry {attempt + 1}/{max_retries} for: {msgid_text}")
time.sleep(0.1) # 重试前等待更长时间
except Exception as e:
print(f"Translation failed for: {msgid_text}")
print(f"Error: {e}")
# 处理翻译失败的情况,删除该条目
if msgid_text in translations:
del translations[msgid_text] # 从缓存中删除该条目
content = re.sub(rf'msgid "{re.escape(msgid_text)}"\nmsgstr ".*?"\n?', '', content)
updated = True # 标记为已更新
continue # 继续处理下一个条目
# 更新content以反映翻译结果
if translated_text: # 确保翻译成功
content = re.sub(
rf'msgid "{re.escape(msgid_text)}"\s*\nmsgstr ".*?"',
rf'msgid "{msgid_text}"\nmsgstr "{translated_text}"',
content
)
updated = True
used_translations.add(msgid_text) # 标记为已使用
# 删除未使用的缓存项
for key in list(translations.keys()):
if key not in used_translations:
print(f"Removing unused cache entry: {key}")
del translations[key]
updated = True
if updated:
save_translation_cache(cache_file, translations)
new_version = update_version(version_file)
print(f"Updated version from {current_version} to {new_version}")
else:
print("No updates.")
# 创建一个额外的文件来指示没有更新
no_update_file = os.path.join(os.path.dirname(output_file), f'{os.path.basename(output_file)}.no-update')
with open(no_update_file, 'w', encoding='utf-8') as f:
f.write("# No updates.\n")
# 确保每个 msgid 和 msgstr 之间没有多余的空格或换行符
content = re.sub(r'\n\s*msgstr', '\nmsgstr', content)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
if __name__ == '__main__':
for lang_code, lang_name in [('en', 'English'), ('fa', 'Persian'), ('ru', 'Russian'), ('ko', 'Korean'), ('fr', 'French')]:
print(f"\nTranslating to {lang_name} ({lang_code})...")
input_file = f'po/{lang_code}.po'
output_file = f'po/{lang_code}.po'
translate_po_file(input_file, output_file, lang_code, lang_name)