Skip to content

Commit 130ac0c

Browse files
authored
Fix PYTHON_THREADPOOL_THREAD_COUNT not apply to Linux Conusmption (#774)
* Move thread count log to function invocation * Also recreate sync_call_tp in Linux Consumption specialization * Add unit tests for Linux Consumption scenario * Fix flake8 * Add unittests for invocation logs * Make sync call tp allocation explicit * Fix deallocation issue * Fix PR issues
1 parent 6d6ccb3 commit 130ac0c

File tree

5 files changed

+416
-56
lines changed

5 files changed

+416
-56
lines changed

azure_functions_worker/dispatcher.py

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import threading
1717
from asyncio import BaseEventLoop
1818
from logging import LogRecord
19-
from typing import Optional
19+
from typing import Optional, List
2020

2121
import grpc
2222

@@ -79,8 +79,8 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
7979
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
8080
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
8181
self._sync_call_tp: concurrent.futures.Executor = (
82-
concurrent.futures.ThreadPoolExecutor(
83-
max_workers=self._sync_tp_max_workers))
82+
self._create_sync_call_tp(self._sync_tp_max_workers)
83+
)
8484

8585
self._grpc_connect_timeout: float = grpc_connect_timeout
8686
# This is set to -1 by default to remove the limitation on msg size
@@ -97,9 +97,7 @@ async def connect(cls, host: str, port: int, worker_id: str,
9797
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
9898
disp._grpc_thread.start()
9999
await disp._grpc_connected_fut
100-
logger.info('Successfully opened gRPC channel to %s:%s '
101-
'with sync threadpool max workers set to %s',
102-
host, port, disp._sync_tp_max_workers)
100+
logger.info('Successfully opened gRPC channel to %s:%s ', host, port)
103101
return disp
104102

105103
async def dispatch_forever(self):
@@ -161,9 +159,7 @@ def stop(self) -> None:
161159
self._grpc_thread.join()
162160
self._grpc_thread = None
163161

164-
if self._sync_call_tp is not None:
165-
self._sync_call_tp.shutdown()
166-
self._sync_call_tp = None
162+
self._stop_sync_call_tp()
167163

168164
def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None:
169165
if record.levelno >= logging.CRITICAL:
@@ -318,11 +314,19 @@ async def _handle__invocation_request(self, req):
318314
fi: functions.FunctionInfo = self._functions.get_function(
319315
function_id)
320316

321-
logger.info(f'Received FunctionInvocationRequest, '
322-
f'request ID: {self.request_id}, '
323-
f'function ID: {function_id}, '
324-
f'invocation ID: {invocation_id}, '
325-
f'function type: {"async" if fi.is_async else "sync"}')
317+
function_invocation_logs: List[str] = [
318+
'Received FunctionInvocationRequest',
319+
f'request ID: {self.request_id}',
320+
f'function ID: {function_id}',
321+
f'invocation ID: {invocation_id}',
322+
f'function type: {"async" if fi.is_async else "sync"}'
323+
]
324+
if not fi.is_async:
325+
function_invocation_logs.append(
326+
f'sync threadpool max workers: {self._sync_tp_max_workers}'
327+
)
328+
logger.info(', '.join(function_invocation_logs))
329+
326330
args = {}
327331
for pb in invoc_request.input_data:
328332
pb_type_info = fi.input_types[pb.name]
@@ -426,6 +430,13 @@ async def _handle__function_environment_reload_request(self, req):
426430
for var in env_vars:
427431
os.environ[var] = env_vars[var]
428432

433+
# Apply PYTHON_THREADPOOL_THREAD_COUNT
434+
self._stop_sync_call_tp()
435+
self._sync_tp_max_workers = self._get_sync_tp_max_workers()
436+
self._sync_call_tp = (
437+
self._create_sync_call_tp(self._sync_tp_max_workers)
438+
)
439+
429440
# Reload package namespaces for customer's libraries
430441
packages_to_reload = ['azure', 'google']
431442
for p in packages_to_reload:
@@ -479,6 +490,15 @@ def _change_cwd(self, new_cwd: str):
479490
else:
480491
logger.warning('Directory %s is not found when reloading', new_cwd)
481492

493+
def _stop_sync_call_tp(self):
494+
"""Deallocate the current synchronous thread pool and assign
495+
self._sync_call_tp to None. If the thread pool does not exist,
496+
this will be a no op.
497+
"""
498+
if getattr(self, '_sync_call_tp', None):
499+
self._sync_call_tp.shutdown()
500+
self._sync_call_tp = None
501+
482502
def _get_sync_tp_max_workers(self) -> int:
483503
def tp_max_workers_validator(value: str) -> bool:
484504
try:
@@ -501,6 +521,17 @@ def tp_max_workers_validator(value: str) -> bool:
501521
default_value=f'{PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT}',
502522
validator=tp_max_workers_validator))
503523

524+
def _create_sync_call_tp(
525+
self, max_worker: int) -> concurrent.futures.Executor:
526+
"""Create a thread pool executor with max_worker. This is a wrapper
527+
over ThreadPoolExecutor constructor. Consider calling this method after
528+
_stop_sync_call_tp() to ensure only 1 synchronous thread pool is
529+
running.
530+
"""
531+
return concurrent.futures.ThreadPoolExecutor(
532+
max_workers=max_worker
533+
)
534+
504535
def __run_sync_func(self, invocation_id, func, params):
505536
# This helper exists because we need to access the current
506537
# invocation_id from ThreadPoolExecutor's threads.

azure_functions_worker/testutils.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,28 @@ async def invoke_function(
408408

409409
return invocation_id, r
410410

411+
async def reload_environment(
412+
self,
413+
environment: typing.Dict[str, str],
414+
function_project_path: str = '/home/site/wwwroot'
415+
) -> protos.FunctionEnvironmentReloadResponse:
416+
417+
request_content = protos.FunctionEnvironmentReloadRequest(
418+
function_app_directory=function_project_path,
419+
environment_variables={
420+
k.encode(): v.encode() for k, v in environment.items()
421+
}
422+
)
423+
424+
r = await self.communicate(
425+
protos.StreamingMessage(
426+
function_environment_reload_request=request_content
427+
),
428+
wait_for='function_environment_reload_response'
429+
)
430+
431+
return r
432+
411433
async def send(self, message):
412434
self._in_queue.put_nowait((message, None))
413435

@@ -453,12 +475,12 @@ def _read_available_functions(self):
453475

454476
class _MockWebHostController:
455477

456-
def __init__(self, scripts_dir):
457-
self._host = None
458-
self._scripts_dir = scripts_dir
459-
self._worker = None
478+
def __init__(self, scripts_dir: pathlib.PurePath):
479+
self._host: typing.Optional[_MockWebHost] = None
480+
self._scripts_dir: pathlib.PurePath = scripts_dir
481+
self._worker: typing.Optional[dispatcher.Dispatcher] = None
460482

461-
async def __aenter__(self):
483+
async def __aenter__(self) -> _MockWebHost:
462484
loop = aio_compat.get_running_loop()
463485
self._host = _MockWebHost(loop, self._scripts_dir)
464486

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import json
4+
import azure.functions as func
5+
6+
7+
async def main(req: func.HttpRequest,
8+
context: func.Context) -> func.HttpResponse:
9+
result = {
10+
'function_directory': context.function_directory,
11+
'function_name': context.function_name
12+
}
13+
return func.HttpResponse(body=json.dumps(result),
14+
mimetype='application/json')
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "req"
8+
},
9+
{
10+
"type": "http",
11+
"direction": "out",
12+
"name": "$return"
13+
}
14+
]
15+
}

0 commit comments

Comments
 (0)