Skip to content

Remove thread count limitation on workers #7671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/WebJobs.Script/Utility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Dynamic;
using System.Globalization;
using System.IO;
Expand Down Expand Up @@ -261,6 +262,11 @@ public static IReadOnlyDictionary<string, string> ToStringValues(this IReadOnlyD
return data.ToDictionary(p => p.Key, p => p.Value != null ? p.Value.ToString() : null, StringComparer.OrdinalIgnoreCase);
}

public static string GetValue(this StringDictionary dictionary, string key)
{
return dictionary.ContainsKey(key) ? dictionary[key] : null;
}

// "Namespace.Class.Method" --> "Method"
public static string GetFunctionShortName(string functionName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Script.Workers
Expand Down Expand Up @@ -47,6 +48,7 @@ public virtual Process CreateWorkerProcess(WorkerContext context)
WorkingDirectory = context.WorkingDirectory,
Arguments = GetArguments(context),
};

var processEnvVariables = context.EnvironmentVariables;
if (processEnvVariables != null && processEnvVariables.Any())
{
Expand All @@ -57,6 +59,9 @@ public virtual Process CreateWorkerProcess(WorkerContext context)
}
startInfo.Arguments = SanitizeExpandedArgument(startInfo.Arguments);
}

ApplyWorkerConcurrencyLimits(startInfo);

return new Process { StartInfo = startInfo };
}

Expand Down Expand Up @@ -89,5 +94,21 @@ internal string SanitizeExpandedArgument(string envExpandedString)
}
return envExpandedString;
}

internal void ApplyWorkerConcurrencyLimits(ProcessStartInfo startInfo)
{
// Set higher concurrency limits for Python and Powershell language workers for V4
string functionWorkerRuntime = startInfo.EnvironmentVariables.GetValue(RpcWorkerConstants.FunctionWorkerRuntimeSettingName);
if (string.IsNullOrEmpty(startInfo.EnvironmentVariables.GetValue(RpcWorkerConstants.PythonThreadpoolThreadCount)) &&
string.Equals(functionWorkerRuntime, RpcWorkerConstants.PythonLanguageWorkerName, StringComparison.OrdinalIgnoreCase))
{
startInfo.EnvironmentVariables[RpcWorkerConstants.PythonThreadpoolThreadCount] = RpcWorkerConstants.DefaultConcurrencyLimit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be exposed in worker.config.json instead of AppSettings?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these settings are user-configurable per app. They are not new, they already exist and the language workers expect them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DefaultConcurrencyLimit could be a part of worker.config, but adding them doesn't add any value as they can be overwritten by the Cx.

}
else if (string.IsNullOrEmpty(startInfo.EnvironmentVariables.GetValue(RpcWorkerConstants.PSWorkerInProcConcurrencyUpperBound)) &&
string.Equals(functionWorkerRuntime, RpcWorkerConstants.PowerShellLanguageWorkerName, StringComparison.OrdinalIgnoreCase))
{
startInfo.EnvironmentVariables[RpcWorkerConstants.PSWorkerInProcConcurrencyUpperBound] = RpcWorkerConstants.DefaultConcurrencyLimit;
}
}
}
}
5 changes: 5 additions & 0 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,10 @@ public static class RpcWorkerConstants
public const string DotNetExecutableName = "dotnet";
public const string DotNetExecutableNameWithExtension = DotNetExecutableName + ".exe";
public const string DotNetFolderName = "dotnet";

// Language worker concurrency limits
public const string PythonThreadpoolThreadCount = "PYTHON_THREADPOOL_THREAD_COUNT";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add the link to the external documentation of both these settings - to give future context on where these things came from.

public const string PSWorkerInProcConcurrencyUpperBound = "PSWorkerInProcConcurrencyUpperBound";
public const string DefaultConcurrencyLimit = "1000";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.Azure.WebJobs.Script;
using Microsoft.Azure.WebJobs.Script.Workers;
using Microsoft.Azure.WebJobs.Script.Workers.Http;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
Expand Down Expand Up @@ -115,5 +117,25 @@ public IDictionary<string, string> GetTestEnvVars()
{
return new Dictionary<string, string>() { { "rpckey1", "rpcvalue1" }, { "rpckey2", "rpcvalue2" } };
}

[Theory]
[InlineData(RpcWorkerConstants.PythonLanguageWorkerName, RpcWorkerConstants.PythonThreadpoolThreadCount, "1", "1")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need any special test for values more than 1000?

[InlineData(RpcWorkerConstants.PythonLanguageWorkerName, RpcWorkerConstants.PythonThreadpoolThreadCount, null, RpcWorkerConstants.DefaultConcurrencyLimit)]
[InlineData(RpcWorkerConstants.PowerShellLanguageWorkerName, RpcWorkerConstants.PSWorkerInProcConcurrencyUpperBound, "1", "1")]
[InlineData(RpcWorkerConstants.PowerShellLanguageWorkerName, RpcWorkerConstants.PSWorkerInProcConcurrencyUpperBound, null, RpcWorkerConstants.DefaultConcurrencyLimit)]
[InlineData(RpcWorkerConstants.NodeLanguageWorkerName, "test", null, null)]
public void DefaultWorkerProcessFactory_ApplyWorkerConcurrencyLimits_WorksAsExpected(string runtime, string name, string value, string expectedValue)
{
DefaultWorkerProcessFactory defaultWorkerProcessFactory = new DefaultWorkerProcessFactory(_loggerFactory);

Process process = defaultWorkerProcessFactory.CreateWorkerProcess(TestWorkerContexts.ToList()[1][0] as WorkerContext);
process.StartInfo.EnvironmentVariables[RpcWorkerConstants.FunctionWorkerRuntimeSettingName] = runtime;
if (!string.IsNullOrEmpty(value))
{
process.StartInfo.EnvironmentVariables[name] = value;
}
defaultWorkerProcessFactory.ApplyWorkerConcurrencyLimits(process.StartInfo);
Assert.Equal(process.StartInfo.EnvironmentVariables.GetValue(name), expectedValue);
}
}
}