Skip to content

Commit 9f529ec

Browse files
committed
feat(http): reuse connection
1 parent dd55eee commit 9f529ec

File tree

8 files changed

+2371
-47
lines changed

8 files changed

+2371
-47
lines changed

dashscope/__init__.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
base_http_api_url,
2626
base_websocket_api_url,
2727
)
28+
from dashscope.common.aio_session_manager import AioSessionManager
29+
from dashscope.common.session_manager import SessionManager
2830
from dashscope.customize.deployments import Deployments
2931
from dashscope.customize.finetunes import FineTunes
3032
from dashscope.embeddings.batch_text_embedding import BatchTextEmbedding
@@ -64,6 +66,276 @@
6466
list_tokenizers,
6567
)
6668

69+
70+
def enable_http_connection_pool(
71+
pool_connections: int = None,
72+
pool_maxsize: int = None,
73+
max_retries: int = None,
74+
pool_block: bool = None,
75+
):
76+
"""
77+
启用 HTTP 连接池复用
78+
79+
启用后,所有同步 HTTP 请求将复用连接,显著减少延迟。
80+
81+
Args:
82+
pool_connections: 连接池大小,默认 10
83+
- 低并发(< 10 req/s): 10
84+
- 中并发(10-50 req/s): 20-30
85+
- 高并发(> 50 req/s): 50-100
86+
87+
pool_maxsize: 最大连接数,默认 20
88+
- 应该 >= pool_connections
89+
- 低并发: 20
90+
- 中并发: 50
91+
- 高并发: 100-200
92+
93+
max_retries: 重试次数,默认 3
94+
- 网络稳定: 3
95+
- 网络不稳定: 5-10
96+
97+
pool_block: 连接池满时是否阻塞,默认 False
98+
- False: 连接池满时创建新连接(推荐)
99+
- True: 连接池满时等待可用连接
100+
101+
Examples:
102+
>>> import dashscope
103+
>>>
104+
>>> # 使用默认配置
105+
>>> dashscope.enable_http_connection_pool()
106+
>>>
107+
>>> # 自定义配置
108+
>>> dashscope.enable_http_connection_pool(
109+
... pool_connections=20,
110+
... pool_maxsize=50
111+
... )
112+
>>>
113+
>>> # 之后的所有请求都会复用连接
114+
>>> Generation.call(model='qwen-turbo', prompt='Hello')
115+
"""
116+
SessionManager.get_instance().enable(
117+
pool_connections=pool_connections,
118+
pool_maxsize=pool_maxsize,
119+
max_retries=max_retries,
120+
pool_block=pool_block,
121+
)
122+
123+
124+
def disable_http_connection_pool():
125+
"""
126+
禁用 HTTP 连接池复用
127+
128+
恢复到原有的每次请求创建新连接的行为。
129+
130+
Example:
131+
>>> import dashscope
132+
>>> dashscope.disable_http_connection_pool()
133+
"""
134+
SessionManager.get_instance().disable()
135+
136+
137+
def reset_http_connection_pool():
138+
"""
139+
重置 HTTP 连接池
140+
141+
用于处理连接问题或网络切换场景。
142+
143+
Example:
144+
>>> import dashscope
145+
>>> dashscope.reset_http_connection_pool()
146+
"""
147+
SessionManager.get_instance().reset()
148+
149+
150+
def configure_http_connection_pool(
151+
pool_connections: int = None,
152+
pool_maxsize: int = None,
153+
max_retries: int = None,
154+
pool_block: bool = None,
155+
):
156+
"""
157+
配置 HTTP 连接池参数
158+
159+
运行时动态调整连接池配置。
160+
161+
Args:
162+
pool_connections: 连接池大小
163+
pool_maxsize: 最大连接数
164+
max_retries: 重试次数
165+
pool_block: 连接池满时是否阻塞
166+
167+
Examples:
168+
>>> import dashscope
169+
>>>
170+
>>> # 调整单个参数
171+
>>> dashscope.configure_http_connection_pool(pool_maxsize=100)
172+
>>>
173+
>>> # 调整多个参数
174+
>>> dashscope.configure_http_connection_pool(
175+
... pool_connections=50,
176+
... pool_maxsize=100
177+
... )
178+
"""
179+
SessionManager.get_instance().configure(
180+
pool_connections=pool_connections,
181+
pool_maxsize=pool_maxsize,
182+
max_retries=max_retries,
183+
pool_block=pool_block,
184+
)
185+
186+
187+
async def enable_aio_http_connection_pool(
188+
limit: int = None,
189+
limit_per_host: int = None,
190+
ttl_dns_cache: int = None,
191+
keepalive_timeout: int = None,
192+
force_close: bool = None,
193+
):
194+
"""
195+
启用异步 HTTP 连接池复用
196+
197+
启用后,所有异步 HTTP 请求将复用连接,显著减少延迟。
198+
199+
Args:
200+
limit: 总连接数限制,默认 100
201+
- 低并发(< 10 req/s): 100
202+
- 中并发(10-50 req/s): 200
203+
- 高并发(> 50 req/s): 300-500
204+
205+
limit_per_host: 每个主机的连接数限制,默认 30
206+
- 应该 <= limit
207+
- 低并发: 30
208+
- 中并发: 50
209+
- 高并发: 100
210+
211+
ttl_dns_cache: DNS 缓存 TTL(秒),默认 300
212+
- DNS 稳定: 300-600
213+
- DNS 变化频繁: 60-120
214+
215+
keepalive_timeout: Keep-Alive 超时(秒),默认 30
216+
- 短连接: 15-30
217+
- 长连接: 60-120
218+
219+
force_close: 是否强制关闭连接,默认 False
220+
- False: 复用连接(推荐)
221+
- True: 每次关闭连接
222+
223+
Examples:
224+
>>> import asyncio
225+
>>> import dashscope
226+
>>> from dashscope import AioGeneration
227+
>>>
228+
>>> async def main():
229+
... # 使用默认配置
230+
... await dashscope.enable_aio_http_connection_pool()
231+
...
232+
... # 之后的所有异步请求都会复用连接
233+
... response = await AioGeneration.call(
234+
... model='qwen-turbo',
235+
... prompt='Hello'
236+
... )
237+
...
238+
... # 自定义配置
239+
... await dashscope.enable_aio_http_connection_pool(
240+
... limit=200,
241+
... limit_per_host=50
242+
... )
243+
>>>
244+
>>> asyncio.run(main())
245+
"""
246+
manager = await AioSessionManager.get_instance()
247+
await manager.enable(
248+
limit=limit,
249+
limit_per_host=limit_per_host,
250+
ttl_dns_cache=ttl_dns_cache,
251+
keepalive_timeout=keepalive_timeout,
252+
force_close=force_close,
253+
)
254+
255+
256+
async def disable_aio_http_connection_pool():
257+
"""
258+
禁用异步 HTTP 连接池复用
259+
260+
恢复到原有的每次请求创建新连接的行为。
261+
262+
Examples:
263+
>>> import asyncio
264+
>>> import dashscope
265+
>>>
266+
>>> async def main():
267+
... await dashscope.disable_aio_http_connection_pool()
268+
>>>
269+
>>> asyncio.run(main())
270+
"""
271+
manager = await AioSessionManager.get_instance()
272+
await manager.disable()
273+
274+
275+
async def reset_aio_http_connection_pool():
276+
"""
277+
重置异步 HTTP 连接池
278+
279+
用于处理连接问题或网络切换场景。
280+
281+
Examples:
282+
>>> import asyncio
283+
>>> import dashscope
284+
>>>
285+
>>> async def main():
286+
... await dashscope.reset_aio_http_connection_pool()
287+
>>>
288+
>>> asyncio.run(main())
289+
"""
290+
manager = await AioSessionManager.get_instance()
291+
await manager.reset()
292+
293+
294+
async def configure_aio_http_connection_pool(
295+
limit: int = None,
296+
limit_per_host: int = None,
297+
ttl_dns_cache: int = None,
298+
keepalive_timeout: int = None,
299+
force_close: bool = None,
300+
):
301+
"""
302+
配置异步 HTTP 连接池参数
303+
304+
运行时动态调整连接池配置。
305+
306+
Args:
307+
limit: 总连接数限制
308+
limit_per_host: 每个主机的连接数限制
309+
ttl_dns_cache: DNS 缓存 TTL(秒)
310+
keepalive_timeout: Keep-Alive 超时(秒)
311+
force_close: 是否强制关闭连接
312+
313+
Examples:
314+
>>> import asyncio
315+
>>> import dashscope
316+
>>>
317+
>>> async def main():
318+
... # 调整单个参数
319+
... await dashscope.configure_aio_http_connection_pool(limit=200)
320+
...
321+
... # 调整多个参数
322+
... await dashscope.configure_aio_http_connection_pool(
323+
... limit=200,
324+
... limit_per_host=50
325+
... )
326+
>>>
327+
>>> asyncio.run(main())
328+
"""
329+
manager = await AioSessionManager.get_instance()
330+
await manager.configure(
331+
limit=limit,
332+
limit_per_host=limit_per_host,
333+
ttl_dns_cache=ttl_dns_cache,
334+
keepalive_timeout=keepalive_timeout,
335+
force_close=force_close,
336+
)
337+
338+
67339
__all__ = [
68340
"base_http_api_url",
69341
"base_websocket_api_url",
@@ -118,6 +390,14 @@
118390
"MessageFile",
119391
"AssistantFile",
120392
"VideoSynthesis",
393+
"enable_http_connection_pool",
394+
"disable_http_connection_pool",
395+
"reset_http_connection_pool",
396+
"configure_http_connection_pool",
397+
"enable_aio_http_connection_pool",
398+
"disable_aio_http_connection_pool",
399+
"reset_aio_http_connection_pool",
400+
"configure_aio_http_connection_pool",
121401
]
122402

123403
logging.getLogger(__name__).addHandler(NullHandler())

dashscope/api_entities/api_request_factory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def _get_protocol_params(kwargs):
3636
base_address = kwargs.pop("base_address", None)
3737
flattened_output = kwargs.pop("flattened_output", False)
3838
extra_url_parameters = kwargs.pop("extra_url_parameters", None)
39+
session = kwargs.pop("session", None)
40+
aio_session = kwargs.pop("aio_session", None)
3941

4042
# Extract user-agent from headers if present
4143
user_agent = ""
@@ -58,6 +60,8 @@ def _get_protocol_params(kwargs):
5860
flattened_output,
5961
extra_url_parameters,
6062
user_agent,
63+
session,
64+
aio_session,
6165
)
6266

6367

@@ -87,6 +91,8 @@ def _build_api_request( # pylint: disable=too-many-branches
8791
flattened_output,
8892
extra_url_parameters,
8993
user_agent,
94+
session,
95+
aio_session,
9096
) = _get_protocol_params(kwargs)
9197
task_id = kwargs.pop("task_id", None)
9298
enable_encryption = kwargs.pop("enable_encryption", False)
@@ -130,6 +136,8 @@ def _build_api_request( # pylint: disable=too-many-branches
130136
flattened_output=flattened_output,
131137
encryption=encryption,
132138
user_agent=user_agent,
139+
session=session,
140+
aio_session=aio_session,
133141
)
134142
elif api_protocol == ApiProtocol.WEBSOCKET:
135143
if base_address is not None:

0 commit comments

Comments
 (0)