diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 5ff1ea9d194f6..3f3a743717909 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -33,6 +33,7 @@ Other enhancements :func:`pandas.concat`. - :class:`pandas.api.typing.FrozenList` is available for typing the outputs of :attr:`MultiIndex.names`, :attr:`MultiIndex.codes` and :attr:`MultiIndex.levels` (:issue:`58237`) - :class:`pandas.api.typing.SASReader` is available for typing the output of :func:`read_sas` (:issue:`55689`) +- :meth:`DataFrame.apply` accepts Numba as an engine by passing the JIT decorator directly, e.g. ``df.apply(func, engine=numba.jit)`` (:issue:`61458`) - Added :meth:`.Styler.to_typst` to write Styler objects to file, buffer or string in Typst format (:issue:`57617`) - Added missing :meth:`pandas.Series.info` to API reference (:issue:`60926`) - :class:`pandas.api.typing.NoDefault` is available for typing ``no_default`` diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 3f3ebe8dbe023..b28fd58a012d5 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -18,14 +18,14 @@ @functools.cache -def generate_apply_looper(func, nopython=True, nogil=True, parallel=False): +def generate_apply_looper(func, decorator: Callable): if TYPE_CHECKING: import numba else: numba = import_optional_dependency("numba") nb_compat_func = jit_user_function(func) - @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + @decorator # pyright: ignore[reportUntypedFunctionDecorator] def nb_looper(values, axis, *args): # Operate on the first row/col in order to get # the output shape diff --git a/pandas/core/apply.py b/pandas/core/apply.py index e228d20b359c6..f74930f95c9a6 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -50,6 +50,7 @@ import pandas.core.common as com from pandas.core.construction import ensure_wrapped_if_datetimelike from pandas.core.util.numba_ import ( + extract_numba_options, get_jit_arguments, prepare_function_arguments, ) @@ -179,6 +180,231 @@ def apply( """ +class NumbaExecutionEngine(BaseExecutionEngine): + """ + Numba-based execution engine for pandas apply and map operations. + """ + + @staticmethod + def map( + data: np.ndarray | Series | DataFrame, + func, + args: tuple, + kwargs: dict, + decorator: Callable | None, + skip_na: bool, + ): + """ + Elementwise map for the Numba engine. Currently not supported. + """ + raise NotImplementedError( + "The Numba engine is not implemented for the map method yet." + ) + + @staticmethod + def apply( + data: np.ndarray | Series | DataFrame, + func, + args: tuple, + kwargs: dict, + decorator: Callable, + axis: int | str, + ): + """ + Apply `func` along the given axis using Numba. + """ + + NumbaExecutionEngine.check_numba_support(func) + + # normalize axis values + if axis in (0, "index"): + axis = 0 + else: + axis = 1 + + # check for data typing + if not isinstance(data, np.ndarray): + if data.empty: + return data.copy() # mimic apply_empty_result() + + engine_kwargs = extract_numba_options(decorator) + + NumbaExecutionEngine.validate_values_for_numba_raw_false( + data, get_jit_arguments(engine_kwargs) + ) + + return NumbaExecutionEngine.apply_raw_false( + data, func, args, kwargs, decorator, axis + ) + + looper_args, looper_kwargs = prepare_function_arguments( + func, + args, + kwargs, + num_required_args=1, + ) + # error: Argument 1 to "__call__" of "_lru_cache_wrapper" has + # incompatible type "Callable[..., Any] | str | list[Callable + # [..., Any] | str] | dict[Hashable,Callable[..., Any] | str | + # list[Callable[..., Any] | str]]"; expected "Hashable" + numba_looper = generate_apply_looper( + func, + decorator, + ) + result = numba_looper(data, axis, *looper_args) + # If we made the result 2-D, squeeze it back to 1-D + return np.squeeze(result) + + @staticmethod + def check_numba_support(func): + if is_list_like(func): + raise NotImplementedError( + "the 'numba' engine doesn't support lists of callables yet" + ) + + elif isinstance(func, str): + raise NotImplementedError( + "the 'numba' engine doesn't support using " + "a string as the callable function" + ) + + elif isinstance(func, np.ufunc): + raise NotImplementedError( + "the 'numba' engine doesn't support " + "using a numpy ufunc as the callable function" + ) + + @staticmethod + def apply_raw_false( + data: Series | DataFrame, + func, + args: tuple, + kwargs: dict, + decorator: Callable, + axis: int | str, + ): + from pandas import ( + DataFrame, + Series, + ) + + results = NumbaExecutionEngine.apply_with_numba( + data, func, args, kwargs, decorator, axis + ) + + if results: + sample = next(iter(results.values())) + if isinstance(sample, Series): + df_result = DataFrame.from_dict( + results, orient="index" if axis == 1 else "columns" + ) + return df_result + else: + return Series(results) + + return DataFrame() if isinstance(data, DataFrame) else Series() + + @staticmethod + def validate_values_for_numba_raw_false( + data: Series | DataFrame, engine_kwargs: dict[str, bool] + ) -> None: + from pandas import Series + + if engine_kwargs.get("parallel", False): + raise NotImplementedError( + "Parallel apply is not supported when raw=False and engine='numba'" + ) + if not data.index.is_unique or not data.columns.is_unique: + raise NotImplementedError( + "The index/columns must be unique when raw=False and engine='numba'" + ) + + if isinstance(data, Series): + if not is_numeric_dtype(data.dtype): + raise ValueError( + f"Series must have a numeric dtype. Found '{data.dtype}' instead" + ) + if is_extension_array_dtype(data.dtype): + raise ValueError( + "Series is backed by an extension array, " + "which is not supported by the numba engine." + ) + else: + for colname, dtype in data.dtypes.items(): + if not is_numeric_dtype(dtype): + raise ValueError( + f"Column {colname} must have a numeric dtype. " + f"Found '{dtype}' instead" + ) + if is_extension_array_dtype(dtype): + raise ValueError( + f"Column {colname} is backed by an extension array, " + f"which is not supported by the numba engine." + ) + + @staticmethod + @functools.cache + def generate_numba_apply_func( + func, axis, decorator: Callable + ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: + numba = import_optional_dependency("numba") + from pandas import Series + from pandas.core._numba.extensions import maybe_cast_str + + jitted_udf = numba.extending.register_jitable(func) + + @decorator # pyright: ignore[reportUntypedFunctionDecorator] + def numba_func(values, col_names_index, index, *args): + results = {} + for i in range(values.shape[1 - axis]): + if axis == 0 or axis == "index": + arr = values[:, i] + result_key = index[i] + arr_index = col_names_index + else: + arr = values[i].copy() + result_key = index[i] + arr_index = col_names_index + ser = Series( + arr, + index=arr_index, + name=maybe_cast_str(result_key), + ) + results[result_key] = jitted_udf(ser, *args) + + return results + + return numba_func + + @staticmethod + def apply_with_numba(data, func, args, kwargs, decorator, axis=0) -> dict[int, Any]: + func = cast(Callable, func) + args, kwargs = prepare_function_arguments( + func, args, kwargs, num_required_args=1 + ) + nb_func = NumbaExecutionEngine.generate_numba_apply_func(func, axis, decorator) + + from pandas.core._numba.extensions import set_numba_data + + # Convert from numba dict to regular dict + # Our isinstance checks in the df constructor don't pass for numbas typed dict + + if axis == 0 or axis == "index": + col_names_index = data.index + result_index = data.columns + else: + col_names_index = data.columns + result_index = data.index + + with ( + set_numba_data(result_index) as index, + set_numba_data(col_names_index) as columns, + ): + res = dict(nb_func(data.values, columns, index, *args)) + + return res + + def frame_apply( obj: DataFrame, func: AggFuncType, @@ -186,8 +412,6 @@ def frame_apply( raw: bool = False, result_type: str | None = None, by_row: Literal[False, "compat"] = "compat", - engine: str = "python", - engine_kwargs: dict[str, bool] | None = None, args=None, kwargs=None, ) -> FrameApply: @@ -211,8 +435,6 @@ def frame_apply( raw=raw, result_type=result_type, by_row=by_row, - engine=engine, - engine_kwargs=engine_kwargs, args=args, kwargs=kwargs, ) @@ -229,8 +451,6 @@ def __init__( result_type: str | None, *, by_row: Literal[False, "compat", "_compat"] = "compat", - engine: str = "python", - engine_kwargs: dict[str, bool] | None = None, args, kwargs, ) -> None: @@ -243,9 +463,6 @@ def __init__( self.args = args or () self.kwargs = kwargs or {} - self.engine = engine - self.engine_kwargs = {} if engine_kwargs is None else engine_kwargs - if result_type not in [None, "reduce", "broadcast", "expand"]: raise ValueError( "invalid value for result_type, must be one " @@ -703,12 +920,6 @@ def apply_list_or_dict_like(self) -> DataFrame | Series: Result when self.func is a list-like or dict-like, None otherwise. """ - if self.engine == "numba": - raise NotImplementedError( - "The 'numba' engine doesn't support list-like/" - "dict likes of callables yet." - ) - if self.axis == 1 and isinstance(self.obj, ABCDataFrame): return self.obj.T.apply(self.func, 0, args=self.args, **self.kwargs).T @@ -870,8 +1081,6 @@ def __init__( result_type: str | None, *, by_row: Literal[False, "compat"] = False, - engine: str = "python", - engine_kwargs: dict[str, bool] | None = None, args, kwargs, ) -> None: @@ -883,8 +1092,6 @@ def __init__( raw, result_type, by_row=by_row, - engine=engine, - engine_kwargs=engine_kwargs, args=args, kwargs=kwargs, ) @@ -907,32 +1114,6 @@ def result_columns(self) -> Index: def series_generator(self) -> Generator[Series]: pass - @staticmethod - @functools.cache - @abc.abstractmethod - def generate_numba_apply_func( - func, nogil=True, nopython=True, parallel=False - ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: - pass - - @abc.abstractmethod - def apply_with_numba(self): - pass - - def validate_values_for_numba(self) -> None: - # Validate column dtyps all OK - for colname, dtype in self.obj.dtypes.items(): - if not is_numeric_dtype(dtype): - raise ValueError( - f"Column {colname} must have a numeric dtype. " - f"Found '{dtype}' instead" - ) - if is_extension_array_dtype(dtype): - raise ValueError( - f"Column {colname} is backed by an extension array, " - f"which is not supported by the numba engine." - ) - @abc.abstractmethod def wrap_results_for_axis( self, results: ResType, res_index: Index @@ -958,10 +1139,6 @@ def apply(self) -> DataFrame | Series: # dispatch to handle list-like or dict-like if is_list_like(self.func): - if self.engine == "numba": - raise NotImplementedError( - "the 'numba' engine doesn't support lists of callables yet" - ) return self.apply_list_or_dict_like() # all empty @@ -970,20 +1147,10 @@ def apply(self) -> DataFrame | Series: # string dispatch if isinstance(self.func, str): - if self.engine == "numba": - raise NotImplementedError( - "the 'numba' engine doesn't support using " - "a string as the callable function" - ) return self.apply_str() # ufunc elif isinstance(self.func, np.ufunc): - if self.engine == "numba": - raise NotImplementedError( - "the 'numba' engine doesn't support " - "using a numpy ufunc as the callable function" - ) with np.errstate(all="ignore"): results = self.obj._mgr.apply("apply", func=self.func) # _constructor will retain self.index and self.columns @@ -991,10 +1158,6 @@ def apply(self) -> DataFrame | Series: # broadcasting if self.result_type == "broadcast": - if self.engine == "numba": - raise NotImplementedError( - "the 'numba' engine doesn't support result_type='broadcast'" - ) return self.apply_broadcast(self.obj) # one axis empty @@ -1003,7 +1166,7 @@ def apply(self) -> DataFrame | Series: # raw elif self.raw: - return self.apply_raw(engine=self.engine, engine_kwargs=self.engine_kwargs) + return self.apply_raw() return self.apply_standard() @@ -1076,7 +1239,7 @@ def apply_empty_result(self): else: return self.obj.copy() - def apply_raw(self, engine="python", engine_kwargs=None): + def apply_raw(self): """apply to the values as a numpy array""" def wrap_function(func): @@ -1094,32 +1257,13 @@ def wrapper(*args, **kwargs): return wrapper - if engine == "numba": - args, kwargs = prepare_function_arguments( - self.func, # type: ignore[arg-type] - self.args, - self.kwargs, - num_required_args=1, - ) - # error: Argument 1 to "__call__" of "_lru_cache_wrapper" has - # incompatible type "Callable[..., Any] | str | list[Callable - # [..., Any] | str] | dict[Hashable,Callable[..., Any] | str | - # list[Callable[..., Any] | str]]"; expected "Hashable" - nb_looper = generate_apply_looper( - self.func, # type: ignore[arg-type] - **get_jit_arguments(engine_kwargs), - ) - result = nb_looper(self.values, self.axis, *args) - # If we made the result 2-D, squeeze it back to 1-D - result = np.squeeze(result) - else: - result = np.apply_along_axis( - wrap_function(self.func), - self.axis, - self.values, - *self.args, - **self.kwargs, - ) + result = np.apply_along_axis( + wrap_function(self.func), + self.axis, + self.values, + *self.args, + **self.kwargs, + ) # TODO: mixed type case if result.ndim == 2: @@ -1156,10 +1300,7 @@ def apply_broadcast(self, target: DataFrame) -> DataFrame: return result def apply_standard(self): - if self.engine == "python": - results, res_index = self.apply_series_generator() - else: - results, res_index = self.apply_series_numba() + results, res_index = self.apply_series_generator() # wrap results return self.wrap_results(results, res_index) @@ -1181,19 +1322,6 @@ def apply_series_generator(self) -> tuple[ResType, Index]: return results, res_index - def apply_series_numba(self): - if self.engine_kwargs.get("parallel", False): - raise NotImplementedError( - "Parallel apply is not supported when raw=False and engine='numba'" - ) - if not self.obj.index.is_unique or not self.columns.is_unique: - raise NotImplementedError( - "The index/columns must be unique when raw=False and engine='numba'" - ) - self.validate_values_for_numba() - results = self.apply_with_numba() - return results, self.result_index - def wrap_results(self, results: ResType, res_index: Index) -> DataFrame | Series: from pandas import Series @@ -1233,54 +1361,6 @@ class FrameRowApply(FrameApply): def series_generator(self) -> Generator[Series]: return (self.obj._ixs(i, axis=1) for i in range(len(self.columns))) - @staticmethod - @functools.cache - def generate_numba_apply_func( - func, nogil=True, nopython=True, parallel=False - ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: - numba = import_optional_dependency("numba") - from pandas import Series - - # Import helper from extensions to cast string object -> np strings - # Note: This also has the side effect of loading our numba extensions - from pandas.core._numba.extensions import maybe_cast_str - - jitted_udf = numba.extending.register_jitable(func) - - # Currently the parallel argument doesn't get passed through here - # (it's disabled) since the dicts in numba aren't thread-safe. - @numba.jit(nogil=nogil, nopython=nopython, parallel=parallel) - def numba_func(values, col_names, df_index, *args): - results = {} - for j in range(values.shape[1]): - # Create the series - ser = Series( - values[:, j], index=df_index, name=maybe_cast_str(col_names[j]) - ) - results[j] = jitted_udf(ser, *args) - return results - - return numba_func - - def apply_with_numba(self) -> dict[int, Any]: - func = cast(Callable, self.func) - args, kwargs = prepare_function_arguments( - func, self.args, self.kwargs, num_required_args=1 - ) - nb_func = self.generate_numba_apply_func( - func, **get_jit_arguments(self.engine_kwargs) - ) - from pandas.core._numba.extensions import set_numba_data - - index = self.obj.index - columns = self.obj.columns - - # Convert from numba dict to regular dict - # Our isinstance checks in the df constructor don't pass for numbas typed dict - with set_numba_data(index) as index, set_numba_data(columns) as columns: - res = dict(nb_func(self.values, columns, index, *args)) - return res - @property def result_index(self) -> Index: return self.columns @@ -1374,57 +1454,6 @@ def series_generator(self) -> Generator[Series]: mgr.blocks[0].refs = BlockValuesRefs(mgr.blocks[0]) yield ser - @staticmethod - @functools.cache - def generate_numba_apply_func( - func, nogil=True, nopython=True, parallel=False - ) -> Callable[[npt.NDArray, Index, Index], dict[int, Any]]: - numba = import_optional_dependency("numba") - from pandas import Series - from pandas.core._numba.extensions import maybe_cast_str - - jitted_udf = numba.extending.register_jitable(func) - - @numba.jit(nogil=nogil, nopython=nopython, parallel=parallel) - def numba_func(values, col_names_index, index, *args): - results = {} - # Currently the parallel argument doesn't get passed through here - # (it's disabled) since the dicts in numba aren't thread-safe. - for i in range(values.shape[0]): - # Create the series - # TODO: values corrupted without the copy - ser = Series( - values[i].copy(), - index=col_names_index, - name=maybe_cast_str(index[i]), - ) - results[i] = jitted_udf(ser, *args) - - return results - - return numba_func - - def apply_with_numba(self) -> dict[int, Any]: - func = cast(Callable, self.func) - args, kwargs = prepare_function_arguments( - func, self.args, self.kwargs, num_required_args=1 - ) - nb_func = self.generate_numba_apply_func( - func, **get_jit_arguments(self.engine_kwargs) - ) - - from pandas.core._numba.extensions import set_numba_data - - # Convert from numba dict to regular dict - # Our isinstance checks in the df constructor don't pass for numbas typed dict - with ( - set_numba_data(self.obj.index) as index, - set_numba_data(self.columns) as columns, - ): - res = dict(nb_func(self.values, columns, index, *args)) - - return res - @property def result_index(self) -> Index: return self.index @@ -1637,11 +1666,6 @@ def agg_or_apply_list_like( def agg_or_apply_dict_like( self, op_name: Literal["agg", "apply"] ) -> DataFrame | Series: - from pandas.core.groupby.generic import ( - DataFrameGroupBy, - SeriesGroupBy, - ) - assert op_name in ["agg", "apply"] obj = self.obj @@ -1656,14 +1680,6 @@ def agg_or_apply_dict_like( selected_obj = obj._selected_obj selection = obj._selection - is_groupby = isinstance(obj, (DataFrameGroupBy, SeriesGroupBy)) - - # Numba Groupby engine/engine-kwargs passthrough - if is_groupby: - engine = self.kwargs.get("engine", None) - engine_kwargs = self.kwargs.get("engine_kwargs", None) - kwargs.update({"engine": engine, "engine_kwargs": engine_kwargs}) - with com.temp_setattr( obj, "as_index", True, condition=hasattr(obj, "as_index") ): diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 8053c17437c5e..dc1ecfd2cf6ce 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -129,7 +129,10 @@ roperator, ) from pandas.core.accessor import Accessor -from pandas.core.apply import reconstruct_and_relabel_result +from pandas.core.apply import ( + NumbaExecutionEngine, + reconstruct_and_relabel_result, +) from pandas.core.array_algos.take import take_2d_multi from pandas.core.arraylike import OpsMixin from pandas.core.arrays import ( @@ -10616,13 +10619,16 @@ def apply( significant amount of time to run. Fast functions are unlikely to run faster with JIT compilation. """ + if engine == "numba": + numba = import_optional_dependency("numba") + numba_jit = numba.jit(**engine_kwargs or {}) + numba_jit.__pandas_udf__ = NumbaExecutionEngine + engine = numba_jit + if engine is None or isinstance(engine, str): from pandas.core.apply import frame_apply - if engine is None: - engine = "python" - - if engine not in ["python", "numba"]: + if engine not in ["python", None]: raise ValueError(f"Unknown engine '{engine}'") op = frame_apply( @@ -10632,12 +10638,11 @@ def apply( raw=raw, result_type=result_type, by_row=by_row, - engine=engine, - engine_kwargs=engine_kwargs, args=args, kwargs=kwargs, ) return op.apply().__finalize__(self, method="apply") + elif hasattr(engine, "__pandas_udf__"): if result_type is not None: raise NotImplementedError( diff --git a/pandas/core/util/numba_.py b/pandas/core/util/numba_.py index d3f00c08e0e2c..bc56bde5a15d6 100644 --- a/pandas/core/util/numba_.py +++ b/pandas/core/util/numba_.py @@ -148,3 +148,24 @@ def prepare_function_arguments( args = args[num_required_args:] return args, kwargs + + +def extract_numba_options(decorator: Callable) -> dict: + """ + Extract targetoptions from a numba.jit decorator + """ + try: + closure = decorator.__closure__ + if closure is None: + return {} + freevars = decorator.__code__.co_freevars + if "targetoptions" not in freevars: + return {} + idx = freevars.index("targetoptions") + cell = closure[idx] + targetoptions = cell.cell_contents + if isinstance(targetoptions, dict): + return targetoptions + return {} + except Exception: + return {} diff --git a/pandas/tests/apply/test_numba.py b/pandas/tests/apply/test_numba.py index 75bc3f5b74b9d..5bb105bb59d5f 100644 --- a/pandas/tests/apply/test_numba.py +++ b/pandas/tests/apply/test_numba.py @@ -10,6 +10,7 @@ Index, ) import pandas._testing as tm +from pandas.core.util.numba_ import extract_numba_options from pandas.util.version import Version pytestmark = [td.skip_if_no("numba"), pytest.mark.single_cpu, pytest.mark.skipif()] @@ -127,3 +128,16 @@ def test_numba_unsupported_dtypes(apply_axis): "which is not supported by the numba engine.", ): df["c"].to_frame().apply(f, engine="numba", axis=apply_axis) + + +@pytest.mark.parametrize( + "jit_args", + [ + {"parallel": True, "nogil": True}, + {"parallel": False, "nogil": False}, + ], +) +def test_extract_numba_options_from_user_decorated_function(jit_args): + extracted = extract_numba_options(numba.jit(**jit_args)) + for k, v in jit_args.items(): + assert extracted.get(k) == v