diff --git a/CHANGELOG.md b/CHANGELOG.md index e34b073999..974e5d3d43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,39 @@ [1]: https://pypi.org/project/bigframes/#history +## [0.9.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.8.0...v0.9.0) (2023-10-18) + + +### ⚠ BREAKING CHANGES + +* rename `bigframes.pandas.reset_session` to `close_session` ([#101](https://github.com/googleapis/python-bigquery-dataframes/issues/101)) + +### Features + +* Add `bigframes.options.bigquery.application_name` for partner attribution ([#117](https://github.com/googleapis/python-bigquery-dataframes/issues/117)) ([52d64ff](https://github.com/googleapis/python-bigquery-dataframes/commit/52d64ffdbbab16b1d94974b543ce9080be1ec0d1)) +* Add AtIndexer getitems ([#107](https://github.com/googleapis/python-bigquery-dataframes/issues/107)) ([752b01f](https://github.com/googleapis/python-bigquery-dataframes/commit/752b01ff9df114c54ed58eb96956e9ce34a8ed47)) +* Rename `bigframes.pandas.reset_session` to `close_session` ([#101](https://github.com/googleapis/python-bigquery-dataframes/issues/101)) ([36693bf](https://github.com/googleapis/python-bigquery-dataframes/commit/36693bff398c23e179d9bde95d52cbaddaf85c45)) +* Send BigQuery cancel request when canceling bigframes process ([#103](https://github.com/googleapis/python-bigquery-dataframes/issues/103)) ([e325fbb](https://github.com/googleapis/python-bigquery-dataframes/commit/e325fbb1c91e040d87df10f7d4d5ce53f7c052cb)) +* Support external packages in `remote_function` ([#98](https://github.com/googleapis/python-bigquery-dataframes/issues/98)) ([ec10c4a](https://github.com/googleapis/python-bigquery-dataframes/commit/ec10c4a5a7833c42e28fe9e7b734bc0c4fb84b6e)) +* Use ArrowDtype for STRUCT columns in `to_pandas` ([#85](https://github.com/googleapis/python-bigquery-dataframes/issues/85)) ([9238fad](https://github.com/googleapis/python-bigquery-dataframes/commit/9238fadcfa7e843be6564813ff3131893b79f8b0)) + + +### Bug Fixes + +* Support multiindex for three loc getitem overloads ([#113](https://github.com/googleapis/python-bigquery-dataframes/issues/113)) ([68e3cd3](https://github.com/googleapis/python-bigquery-dataframes/commit/68e3cd37258084d045ea1075e5e61df12c28faac)) + + +### Performance Improvements + +* If primary keys are defined, `read_gbq` avoids copying table data ([#112](https://github.com/googleapis/python-bigquery-dataframes/issues/112)) ([e6c0cd1](https://github.com/googleapis/python-bigquery-dataframes/commit/e6c0cd1777736e0fa7285da59625fbac487573bd)) + + +### Documentation + +* Add documentation for `Series.struct.field` and `Series.struct.explode` ([#114](https://github.com/googleapis/python-bigquery-dataframes/issues/114)) ([a6dab9c](https://github.com/googleapis/python-bigquery-dataframes/commit/a6dab9cdb7dd0e56c93ca96b665ab1be1baac5e5)) +* Add open-source link in API doc ([#106](https://github.com/googleapis/python-bigquery-dataframes/issues/106)) ([db51fe3](https://github.com/googleapis/python-bigquery-dataframes/commit/db51fe340f644a0d7c911c11d92c8299a4be3446)) +* Update ML overview API doc ([#105](https://github.com/googleapis/python-bigquery-dataframes/issues/105)) ([1b3f3a5](https://github.com/googleapis/python-bigquery-dataframes/commit/1b3f3a5374915b2833c6c1ac05670e9708f07bff)) + ## [0.8.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.7.0...v0.8.0) (2023-10-12) diff --git a/README.rst b/README.rst index 77c42e4325..5ddb4a7639 100644 --- a/README.rst +++ b/README.rst @@ -13,6 +13,7 @@ BigQuery DataFrames is an open-source package. You can run Documentation ------------- +* `BigQuery DataFrames source code (GitHub) `_ * `BigQuery DataFrames sample notebooks `_ * `BigQuery DataFrames API reference `_ * `BigQuery documentation `_ @@ -63,7 +64,7 @@ auto-populates ``bf.options.bigquery.location`` if the user starts with directly or in a SQL statement. If you want to reset the location of the created DataFrame or Series objects, -you can reset the session by executing ``bigframes.pandas.reset_session()``. +you can close the session by executing ``bigframes.pandas.close_session()``. After that, you can reuse ``bigframes.pandas.options.bigquery.location`` to specify another location. @@ -94,10 +95,18 @@ using the and the `bigframes.ml.compose module `_. BigQuery DataFrames offers the following transformations: -* Use the `OneHotEncoder class `_ - in the ``bigframes.ml.preprocessing`` module to transform categorical values into numeric format. +* Use the `KBinsDiscretizer class `_ + in the ``bigframes.ml.preprocessing`` module to bin continuous data into intervals. +* Use the `LabelEncoder class `_ + in the ``bigframes.ml.preprocessing`` module to normalize the target labels as integer values. +* Use the `MaxAbsScaler class `_ + in the ``bigframes.ml.preprocessing`` module to scale each feature to the range ``[-1, 1]`` by its maximum absolute value. +* Use the `MinMaxScaler class `_ + in the ``bigframes.ml.preprocessing`` module to standardize features by scaling each feature to the range ``[0, 1]``. * Use the `StandardScaler class `_ in the ``bigframes.ml.preprocessing`` module to standardize features by removing the mean and scaling to unit variance. +* Use the `OneHotEncoder class `_ + in the ``bigframes.ml.preprocessing`` module to transform categorical values into numeric format. * Use the `ColumnTransformer class `_ in the ``bigframes.ml.compose`` module to apply transformers to DataFrames columns. @@ -335,7 +344,7 @@ sessions ; when this happens, you can’t use previously created DataFrame or Series objects and must re-create them using a new BigQuery DataFrames session. You can do this by running -``bigframes.pandas.reset_session()`` and then re-running the BigQuery +``bigframes.pandas.close_session()`` and then re-running the BigQuery DataFrames expressions. diff --git a/bigframes/__init__.py b/bigframes/__init__.py index 3e54a6d090..8f41790072 100644 --- a/bigframes/__init__.py +++ b/bigframes/__init__.py @@ -16,7 +16,7 @@ from bigframes._config import options from bigframes._config.bigquery_options import BigQueryOptions -from bigframes.core.global_session import get_global_session, reset_session +from bigframes.core.global_session import close_session, get_global_session from bigframes.session import connect, Session from bigframes.version import __version__ @@ -24,7 +24,7 @@ "options", "BigQueryOptions", "get_global_session", - "reset_session", + "close_session", "connect", "Session", "__version__", diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index eb56de826a..d0cce9492b 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -23,7 +23,7 @@ SESSION_STARTED_MESSAGE = ( "Cannot change '{attribute}' once a session has started. " - "Call bigframes.pandas.reset_session() first, if you are using the bigframes.pandas API." + "Call bigframes.pandas.close_session() first, if you are using the bigframes.pandas API." ) @@ -37,14 +37,33 @@ def __init__( location: Optional[str] = None, bq_connection: Optional[str] = None, use_regional_endpoints: bool = False, + application_name: Optional[str] = None, ): self._credentials = credentials self._project = project self._location = location self._bq_connection = bq_connection self._use_regional_endpoints = use_regional_endpoints + self._application_name = application_name self._session_started = False + @property + def application_name(self) -> Optional[str]: + """The application name to amend to the user-agent sent to Google APIs. + + Recommended format is ``"appplication-name/major.minor.patch_version"`` + or ``"(gpn:PartnerName;)"`` for official Google partners. + """ + return self._application_name + + @application_name.setter + def application_name(self, value: Optional[str]): + if self._session_started and self._application_name != value: + raise ValueError( + SESSION_STARTED_MESSAGE.format(attribute="application_name") + ) + self._application_name = value + @property def credentials(self) -> Optional[google.auth.credentials.Credentials]: """The OAuth2 Credentials to use for this client.""" diff --git a/bigframes/clients.py b/bigframes/clients.py index dcac611e8c..4ba9d93d69 100644 --- a/bigframes/clients.py +++ b/bigframes/clients.py @@ -29,8 +29,6 @@ ) logger = logging.getLogger(__name__) -_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection" - class BqConnectionManager: """Manager to handle operations with BQ connections.""" @@ -46,6 +44,23 @@ def __init__( self._bq_connection_client = bq_connection_client self._cloud_resource_manager_client = cloud_resource_manager_client + @classmethod + def resolve_full_connection_name( + cls, connection_name: str, default_project: str, default_location: str + ) -> str: + """Retrieve the full connection name of the form ... + Use default project, location or connection_id when any of them are missing.""" + if connection_name.count(".") == 2: + return connection_name + + if connection_name.count(".") == 1: + return f"{default_project}.{connection_name}" + + if connection_name.count(".") == 0: + return f"{default_project}.{default_location}.{connection_name}" + + raise ValueError(f"Invalid connection name format: {connection_name}.") + def create_bq_connection( self, project_id: str, location: str, connection_id: str, iam_role: str ): @@ -164,25 +179,3 @@ def _get_service_account_if_connection_exists( pass return service_account - - -def get_connection_name_full( - connection_name: Optional[str], default_project: str, default_location: str -) -> str: - """Retrieve the full connection name of the form ... - Use default project, location or connection_id when any of them are missing.""" - if connection_name is None: - return ( - f"{default_project}.{default_location}.{_BIGFRAMES_DEFAULT_CONNECTION_ID}" - ) - - if connection_name.count(".") == 2: - return connection_name - - if connection_name.count(".") == 1: - return f"{default_project}.{connection_name}" - - if connection_name.count(".") == 0: - return f"{default_project}.{default_location}.{connection_name}" - - raise ValueError(f"Invalid connection name format: {connection_name}.") diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index ccfd682215..6c78a07f3b 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -211,8 +211,8 @@ def column_ids(self) -> typing.Sequence[str]: return tuple(self._column_names.keys()) @property - def hidden_ordering_columns(self) -> typing.Tuple[ibis_types.Value, ...]: - return self._hidden_ordering_columns + def _hidden_column_ids(self) -> typing.Sequence[str]: + return tuple(self._hidden_ordering_column_names.keys()) @property def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: @@ -400,7 +400,7 @@ def _hide_column(self, column_id) -> ArrayValue: expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name}) return expr_builder.build() - def promote_offsets(self) -> typing.Tuple[ArrayValue, str]: + def promote_offsets(self, col_id: str) -> ArrayValue: """ Convenience function to promote copy of column offsets to a value column. Can be used to reset index. """ @@ -408,16 +408,15 @@ def promote_offsets(self) -> typing.Tuple[ArrayValue, str]: ordering = self._ordering if (not ordering.is_sequential) or (not ordering.total_order_col): - return self._project_offsets().promote_offsets() - col_id = bigframes.core.guid.generate_guid() + return self._project_offsets().promote_offsets(col_id) expr_builder = self.builder() expr_builder.columns = [ self._get_any_column(ordering.total_order_col.column_id).name(col_id), *self.columns, ] - return expr_builder.build(), col_id + return expr_builder.build() - def select_columns(self, column_ids: typing.Sequence[str]): + def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue: return self._projection( [self._get_ibis_column(col_id) for col_id in column_ids] ) @@ -807,7 +806,7 @@ def _create_order_columns( elif ordering_mode == "string_encoded": return (self._create_string_ordering_column().name(order_col_name),) elif expose_hidden_cols: - return self.hidden_ordering_columns + return self._hidden_ordering_columns return () def _create_offset_column(self) -> ibis_types.IntegerColumn: diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 904da7f312..b0f05f4798 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -40,8 +40,8 @@ def equals(block1: blocks.Block, block2: blocks.Block) -> bool: equality_ids = [] for lcol, rcol in zip(block1.value_columns, block2.value_columns): - lcolmapped = lmap(lcol) - rcolmapped = rmap(rcol) + lcolmapped = lmap[lcol] + rcolmapped = rmap[rcol] joined_block, result_id = joined_block.apply_binary_op( lcolmapped, rcolmapped, ops.eq_nulls_match_op ) @@ -563,8 +563,8 @@ def align_rows( joined_index, (get_column_left, get_column_right) = left_block.index.join( right_block.index, how=join ) - left_columns = [get_column_left(col) for col in left_block.value_columns] - right_columns = [get_column_right(col) for col in right_block.value_columns] + left_columns = [get_column_left[col] for col in left_block.value_columns] + right_columns = [get_column_right[col] for col in right_block.value_columns] left_block = joined_index._block.select_columns(left_columns) right_block = joined_index._block.select_columns(right_columns) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9b49645c71..8966b6189b 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -39,6 +39,7 @@ import bigframes.core.guid as guid import bigframes.core.indexes as indexes import bigframes.core.joins as joins +import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as ordering import bigframes.core.utils import bigframes.core.utils as utils @@ -97,7 +98,8 @@ def __init__( "'index_columns' and 'index_labels' must have equal length" ) if len(index_columns) == 0: - expr, new_index_col_id = expr.promote_offsets() + new_index_col_id = guid.generate_guid() + expr = expr.promote_offsets(new_index_col_id) index_columns = [new_index_col_id] self._index_columns = tuple(index_columns) # Index labels don't need complicated hierarchical access so can store as tuple @@ -260,7 +262,8 @@ def reset_index(self, drop: bool = True) -> Block: from Index classes that point to this block. """ block = self - expr, new_index_col_id = self._expr.promote_offsets() + new_index_col_id = guid.generate_guid() + expr = self._expr.promote_offsets(new_index_col_id) if drop: # Even though the index might be part of the ordering, keep that # ordering expression as reset_index shouldn't change the row @@ -374,7 +377,9 @@ def _to_dataframe( cls, result, schema: typing.Mapping[str, bigframes.dtypes.Dtype] ) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" + dtypes = bigframes.dtypes.to_pandas_dtypes_overrides(result.schema) df = result.to_dataframe( + dtypes=dtypes, bool_dtype=pd.BooleanDtype(), int_dtype=pd.Int64Dtype(), float_dtype=pd.Float64Dtype(), @@ -833,7 +838,8 @@ def aggregate_all_and_stack( else: # axis_n == 1 # using offsets as identity to group on. # TODO: Allow to promote identity/total_order columns instead for better perf - expr_with_offsets, offset_col = self.expr.promote_offsets() + offset_col = guid.generate_guid() + expr_with_offsets = self.expr.promote_offsets(offset_col) stacked_expr = expr_with_offsets.unpivot( row_labels=self.column_labels.to_list(), index_col_ids=[guid.generate_guid()], @@ -952,9 +958,10 @@ def aggregate( ] by_column_labels = self._get_labels_for_columns(by_value_columns) labels = (*by_column_labels, *aggregate_labels) - result_expr_pruned, offsets_id = result_expr.select_columns( + offsets_id = guid.generate_guid() + result_expr_pruned = result_expr.select_columns( [*by_value_columns, *output_col_ids] - ).promote_offsets() + ).promote_offsets(offsets_id) return ( Block( @@ -975,7 +982,8 @@ def get_stat(self, column_id: str, stat: agg_ops.AggregateOp): aggregations = [(column_id, stat, stat.name) for stat in stats_to_fetch] expr = self.expr.aggregate(aggregations) - expr, offset_index_id = expr.promote_offsets() + offset_index_id = guid.generate_guid() + expr = expr.promote_offsets(offset_index_id) block = Block( expr, index_columns=[offset_index_id], @@ -999,7 +1007,8 @@ def get_corr_stat(self, column_id_left: str, column_id_right: str): ) ] expr = self.expr.corr_aggregate(corr_aggregations) - expr, offset_index_id = expr.promote_offsets() + offset_index_id = guid.generate_guid() + expr = expr.promote_offsets(offset_index_id) block = Block( expr, index_columns=[offset_index_id], @@ -1197,7 +1206,8 @@ def retrieve_repr_request_results( return formatted_df, count, query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: - expr, result_id = self._expr.promote_offsets() + result_id = guid.generate_guid() + expr = self._expr.promote_offsets(result_id) return ( Block( expr, @@ -1471,67 +1481,76 @@ def merge( "outer", "right", ], - left_col_ids: typing.Sequence[str], - right_col_ids: typing.Sequence[str], + left_join_ids: typing.Sequence[str], + right_join_ids: typing.Sequence[str], sort: bool, suffixes: tuple[str, str] = ("_x", "_y"), ) -> Block: - ( - joined_expr, - coalesced_join_cols, - (get_column_left, get_column_right), - ) = joins.join_by_column( + joined_expr = joins.join_by_column( self.expr, - left_col_ids, + left_join_ids, other.expr, - right_col_ids, + right_join_ids, how=how, - sort=sort, ) + get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + self.expr.column_ids, other.expr.column_ids + ) + result_columns = [] + matching_join_labels = [] + + coalesced_ids = [] + for left_id, right_id in zip(left_join_ids, right_join_ids): + coalesced_id = guid.generate_guid() + joined_expr = joined_expr.project_binary_op( + get_column_left[left_id], + get_column_right[right_id], + ops.coalesce_op, + coalesced_id, + ) + coalesced_ids.append(coalesced_id) + + for col_id in self.value_columns: + if col_id in left_join_ids: + key_part = left_join_ids.index(col_id) + matching_right_id = right_join_ids[key_part] + if ( + self.col_id_to_label[col_id] + == other.col_id_to_label[matching_right_id] + ): + matching_join_labels.append(self.col_id_to_label[col_id]) + result_columns.append(coalesced_ids[key_part]) + else: + result_columns.append(get_column_left[col_id]) + else: + result_columns.append(get_column_left[col_id]) + for col_id in other.value_columns: + if col_id in right_join_ids: + key_part = right_join_ids.index(col_id) + if other.col_id_to_label[matching_right_id] in matching_join_labels: + pass + else: + result_columns.append(get_column_right[col_id]) + else: + result_columns.append(get_column_right[col_id]) - # which join key parts should be coalesced - merge_join_key_mask = [ - str(self.col_id_to_label[left_id]) == str(other.col_id_to_label[right_id]) - for left_id, right_id in zip(left_col_ids, right_col_ids) - ] - labels_to_coalesce = [ - self.col_id_to_label[col_id] - for i, col_id in enumerate(left_col_ids) - if merge_join_key_mask[i] - ] - - def left_col_mapping(col_id: str) -> str: - if col_id in left_col_ids: - join_key_part = left_col_ids.index(col_id) - if merge_join_key_mask[join_key_part]: - return coalesced_join_cols[join_key_part] - return get_column_left(col_id) - - def right_col_mapping(col_id: str) -> typing.Optional[str]: - if col_id in right_col_ids: - join_key_part = right_col_ids.index(col_id) - if merge_join_key_mask[join_key_part]: - return None - return get_column_right(col_id) - - left_columns = [left_col_mapping(col_id) for col_id in self.value_columns] - - right_columns = [ - typing.cast(str, right_col_mapping(col_id)) - for col_id in other.value_columns - if right_col_mapping(col_id) - ] + if sort: + # sort uses coalesced join keys always + joined_expr = joined_expr.order_by( + [ordering.OrderingColumnReference(col_id) for col_id in coalesced_ids], + stable=True, + ) - expr = joined_expr.select_columns([*left_columns, *right_columns]) + joined_expr = joined_expr.select_columns(result_columns) labels = utils.merge_column_labels( self.column_labels, other.column_labels, - coalesce_labels=labels_to_coalesce, + coalesce_labels=matching_join_labels, suffixes=suffixes, ) - # Constructs default index - expr, offset_index_id = expr.promote_offsets() + offset_index_id = guid.generate_guid() + expr = joined_expr.promote_offsets(offset_index_id) return Block(expr, index_columns=[offset_index_id], column_labels=labels) def _force_reproject(self) -> Block: diff --git a/bigframes/core/global_session.py b/bigframes/core/global_session.py index 68529981cd..1f960839a0 100644 --- a/bigframes/core/global_session.py +++ b/bigframes/core/global_session.py @@ -24,7 +24,7 @@ _global_session_lock = threading.Lock() -def reset_session() -> None: +def close_session() -> None: """Start a fresh session the next time a function requires a session. Closes the current session if it was already started. diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index a74880041c..4f5a9471b9 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -15,12 +15,13 @@ from __future__ import annotations import typing -from typing import Tuple, Union +from typing import List, Tuple, Union import ibis import pandas as pd import bigframes.constants as constants +import bigframes.core.blocks import bigframes.core.guid as guid import bigframes.core.indexes as indexes import bigframes.core.scalar @@ -107,6 +108,16 @@ def __getitem__(self, key: int) -> bigframes.core.scalar.Scalar: return self._series.iloc[key] +class AtSeriesIndexer: + def __init__(self, series: bigframes.series.Series): + self._series = series + + def __getitem__( + self, key: LocSingleKey + ) -> Union[bigframes.core.scalar.Scalar, bigframes.series.Series]: + return self._series.loc[key] + + class LocDataFrameIndexer: def __init__(self, dataframe: bigframes.dataframe.DataFrame): self._dataframe = dataframe @@ -214,12 +225,26 @@ def __getitem__(self, key: tuple) -> bigframes.core.scalar.Scalar: raise ValueError(error_message) if len(key) != 2: raise TypeError(error_message) - block = self._dataframe._block + block: bigframes.core.blocks.Block = self._dataframe._block # type: ignore column_block = block.select_columns([block.value_columns[key[1]]]) column = bigframes.series.Series(column_block) return column.iloc[key[0]] +class AtDataFrameIndexer: + def __init__(self, dataframe: bigframes.dataframe.DataFrame): + self._dataframe = dataframe + + def __getitem__( + self, key: tuple + ) -> Union[bigframes.core.scalar.Scalar, bigframes.series.Series]: + if not isinstance(key, tuple): + raise TypeError( + "DataFrame.at should be indexed by a (row label, column name) tuple." + ) + return self._dataframe.loc[key] + + @typing.overload def _loc_getitem_series_or_dataframe( series_or_dataframe: bigframes.series.Series, key @@ -246,40 +271,59 @@ def _loc_getitem_series_or_dataframe( if isinstance(key, bigframes.series.Series) and key.dtype == "boolean": return series_or_dataframe[key] elif isinstance(key, bigframes.series.Series): - # TODO(henryjsolberg): support MultiIndex temp_name = guid.generate_guid(prefix="temp_series_name_") + if len(series_or_dataframe.index.names) > 1: + temp_name = series_or_dataframe.index.names[0] key = key.rename(temp_name) keys_df = key.to_frame() keys_df = keys_df.set_index(temp_name, drop=True) return _perform_loc_list_join(series_or_dataframe, keys_df) elif isinstance(key, bigframes.core.indexes.Index): - # TODO(henryjsolberg): support MultiIndex block = key._data._get_block() block = block.select_columns(()) keys_df = bigframes.dataframe.DataFrame(block) return _perform_loc_list_join(series_or_dataframe, keys_df) elif pd.api.types.is_list_like(key): - # TODO(henryjsolberg): support MultiIndex - if len(key) == 0: # type: ignore + key = typing.cast(List, key) + if len(key) == 0: return typing.cast( Union[bigframes.dataframe.DataFrame, bigframes.series.Series], series_or_dataframe.iloc[0:0], ) - - # We can't upload a DataFrame with None as the column name, so set it - # an arbitrary string. - index_name = series_or_dataframe.index.name - index_name_is_none = index_name is None - if index_name_is_none: - index_name = "unnamed_col" - - keys_df = bigframes.dataframe.DataFrame( - {index_name: key}, session=series_or_dataframe._get_block().expr._session - ) - keys_df = keys_df.set_index(index_name, drop=True) - - if index_name_is_none: - keys_df.index.name = None + if pd.api.types.is_list_like(key[0]): + original_index_names = series_or_dataframe.index.names + num_index_cols = len(original_index_names) + + entry_col_count_correct = [len(entry) == num_index_cols for entry in key] + if not all(entry_col_count_correct): + # pandas usually throws TypeError in these cases- tuple causes IndexError, but that + # seems like unintended behavior + raise TypeError( + "All entries must be of equal length when indexing by list of listlikes" + ) + temporary_index_names = [ + guid.generate_guid(prefix="temp_loc_index_") + for _ in range(len(original_index_names)) + ] + index_cols_dict = {} + for i in range(num_index_cols): + index_name = temporary_index_names[i] + values = [entry[i] for entry in key] + index_cols_dict[index_name] = values + keys_df = bigframes.dataframe.DataFrame(index_cols_dict) + keys_df = keys_df.set_index(temporary_index_names, drop=True) + keys_df = keys_df.rename_axis(original_index_names) + else: + # We can't upload a DataFrame with None as the column name, so set it + # an arbitrary string. + index_name = series_or_dataframe.index.name + index_name_is_none = index_name is None + if index_name_is_none: + index_name = "unnamed_col" + keys_df = bigframes.dataframe.DataFrame({index_name: key}) + keys_df = keys_df.set_index(index_name, drop=True) + if index_name_is_none: + keys_df.index.name = None return _perform_loc_list_join(series_or_dataframe, keys_df) elif isinstance(key, slice): if (key.start is None) and (key.stop is None) and (key.step is None): diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index 7d15e67649..677bb8529c 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -17,7 +17,7 @@ from __future__ import annotations import typing -from typing import Callable, Sequence, Tuple, Union +from typing import Mapping, Sequence, Tuple, Union import numpy as np import pandas @@ -27,6 +27,7 @@ import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.joins as joins +import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.dtypes @@ -413,7 +414,7 @@ def join( how="left", sort=False, block_identity_join: bool = False, - ) -> Tuple[IndexValue, Tuple[Callable[[str], str], Callable[[str], str]],]: + ) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: if not isinstance(other, IndexValue): # TODO(swast): We need to improve this error message to be more # actionable for the user. For example, it's possible they @@ -456,27 +457,34 @@ def join_mono_indexed( how="left", sort=False, block_identity_join: bool = False, -) -> Tuple[IndexValue, Tuple[Callable[[str], str], Callable[[str], str]],]: - ( - combined_expr, - joined_index_col_names, - (get_column_left, get_column_right), - ) = joins.join_by_column( +) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: + left_expr = left._block.expr + right_expr = right._block.expr + get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + left_expr.column_ids, right_expr.column_ids + ) + combined_expr = joins.join_by_column( left._block.expr, left._block.index_columns, right._block.expr, right._block.index_columns, how=how, - sort=sort, allow_row_identity_join=(not block_identity_join), ) # Drop original indices from each side. and used the coalesced combination generated by the join. - left_indices = [get_column_left(col_id) for col_id in left._block.index_columns] - right_indices = [get_column_right(col_id) for col_id in right._block.index_columns] - combined_expr = combined_expr.drop_columns(left_indices).drop_columns(right_indices) + left_index = get_column_left[left._block.index_columns[0]] + right_index = get_column_right[right._block.index_columns[0]] + # Drop original indices from each side. and used the coalesced combination generated by the join. + combined_expr, coalesced_join_cols = coalesce_columns( + combined_expr, [left_index], [right_index], how=how + ) + if sort: + combined_expr = combined_expr.order_by( + [order.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] + ) block = blocks.Block( combined_expr, - index_columns=[*joined_index_col_names], + index_columns=coalesced_join_cols, column_labels=[*left._block.column_labels, *right._block.column_labels], index_labels=[left.name] if left.name == right.name else [None], ) @@ -493,7 +501,7 @@ def join_multi_indexed( how="left", sort=False, block_identity_join: bool = False, -) -> Tuple[IndexValue, Tuple[Callable[[str], str], Callable[[str], str]],]: +) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: if not (left.is_uniquely_named() and right.is_uniquely_named()): raise ValueError("Joins not supported on indices with non-unique level names") @@ -508,25 +516,33 @@ def join_multi_indexed( right_join_ids = [right.resolve_level_name(name) for name in common_names] names_fully_match = len(left_only_names) == 0 and len(right_only_names) == 0 - ( - combined_expr, - joined_index_col_names, - (get_column_left, get_column_right), - ) = joins.join_by_column( - left._block.expr, + + left_expr = left._block.expr + right_expr = right._block.expr + get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + left_expr.column_ids, right_expr.column_ids + ) + + combined_expr = joins.join_by_column( + left_expr, left_join_ids, - right._block.expr, + right_expr, right_join_ids, how=how, - sort=sort, # If we're only joining on a subset of the index columns, we need to # perform a true join. - allow_row_identity_join=names_fully_match and not block_identity_join, + allow_row_identity_join=(names_fully_match and not block_identity_join), ) + left_ids_post_join = [get_column_left[id] for id in left_join_ids] + right_ids_post_join = [get_column_right[id] for id in right_join_ids] # Drop original indices from each side. and used the coalesced combination generated by the join. - combined_expr = combined_expr.drop_columns( - [get_column_left(col) for col in left_join_ids] - ).drop_columns([get_column_right(col) for col in right_join_ids]) + combined_expr, coalesced_join_cols = coalesce_columns( + combined_expr, left_ids_post_join, right_ids_post_join, how=how + ) + if sort: + combined_expr = combined_expr.order_by( + [order.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] + ) if left.nlevels == 1: index_labels = right.names @@ -536,12 +552,13 @@ def join_multi_indexed( index_labels = [*common_names, *left_only_names, *right_only_names] def resolve_label_id(label: blocks.Label) -> str: + # if name is shared between both blocks, coalesce the values if label in common_names: - return joined_index_col_names[common_names.index(label)] + return coalesced_join_cols[common_names.index(label)] if label in left_only_names: - return get_column_left(left.resolve_level_name(label)) + return get_column_left[left.resolve_level_name(label)] if label in right_only_names: - return get_column_right(right.resolve_level_name(label)) + return get_column_right[right.resolve_level_name(label)] raise ValueError(f"Unexpected label: {label}") index_columns = [resolve_label_id(label) for label in index_labels] @@ -556,3 +573,29 @@ def resolve_label_id(label: blocks.Label) -> str: typing.cast(IndexValue, block.index), (get_column_left, get_column_right), ) + + +def coalesce_columns( + expr: core.ArrayValue, + left_ids: typing.Sequence[str], + right_ids: typing.Sequence[str], + how: str, +) -> Tuple[core.ArrayValue, Sequence[str]]: + result_ids = [] + for left_id, right_id in zip(left_ids, right_ids): + if how == "left" or how == "inner": + result_ids.append(left_id) + expr = expr.drop_columns([right_id]) + elif how == "right": + result_ids.append(right_id) + expr = expr.drop_columns([left_id]) + elif how == "outer": + coalesced_id = bigframes.core.guid.generate_guid() + expr = expr.project_binary_op( + left_id, right_id, ops.coalesce_op, coalesced_id + ) + expr = expr.drop_columns([left_id, right_id]) + result_ids.append(coalesced_id) + else: + raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}") + return expr, result_ids diff --git a/bigframes/core/joins/name_resolution.py b/bigframes/core/joins/name_resolution.py new file mode 100644 index 0000000000..df946b3a59 --- /dev/null +++ b/bigframes/core/joins/name_resolution.py @@ -0,0 +1,46 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Mapping, Sequence, Tuple + + +class JoinNameRemapper: + def __init__(self, namespace: str) -> None: + self._namespace = namespace + + def __call__( + self, left_column_ids: Sequence[str], right_column_ids: Sequence[str] + ) -> Tuple[Mapping[str, str], Mapping[str, str]]: + """ + When joining column ids from different namespaces, this function defines how names are remapped. + + Take care to map value column ids and hidden column ids in separate namespaces. This is important because value + column ids must be deterministic as they are referenced by dependent operators. The generation of hidden ids is + dependent on compilation context, and should be completely separated from value column id mappings. + """ + # This naming strategy depends on the number of value columns in source tables. + # This means column id mappings must be adjusted if pushing operations above or below join in transformation + new_left_ids = { + col: f"{self._namespace}_l_{i}" for i, col in enumerate(left_column_ids) + } + new_right_ids = { + col: f"{self._namespace}_r_{i}" for i, col in enumerate(right_column_ids) + } + return new_left_ids, new_right_ids + + +# Defines how column ids are remapped, regardless of join strategy or ordering mode +# Use this remapper for all value column remappings. +JOIN_NAME_REMAPPER = JoinNameRemapper("bfjoin") diff --git a/bigframes/core/joins/row_identity.py b/bigframes/core/joins/row_identity.py index 156e7aef40..76e456ec94 100644 --- a/bigframes/core/joins/row_identity.py +++ b/bigframes/core/joins/row_identity.py @@ -18,20 +18,20 @@ import functools import typing -from typing import Callable, Tuple import ibis import ibis.expr.types as ibis_types import bigframes.constants as constants import bigframes.core as core +import bigframes.core.joins.name_resolution as naming SUPPORTED_ROW_IDENTITY_HOW = {"outer", "left", "inner"} def join_by_row_identity( left: core.ArrayValue, right: core.ArrayValue, *, how: str -) -> Tuple[core.ArrayValue, Tuple[Callable[[str], str], Callable[[str], str]],]: +) -> core.ArrayValue: """Compute join when we are joining by row identity not a specific column.""" if how not in SUPPORTED_ROW_IDENTITY_HOW: raise NotImplementedError( @@ -62,31 +62,42 @@ def join_by_row_identity( left_mask = left_relative_predicates if how in ["right", "outer"] else None right_mask = right_relative_predicates if how in ["left", "outer"] else None + + # Public mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result + lpublicmapping, rpublicmapping = naming.JOIN_NAME_REMAPPER( + left.column_ids, right.column_ids + ) + lhiddenmapping, rhiddenmapping = naming.JoinNameRemapper(namespace="hidden")( + left._hidden_column_ids, right._hidden_column_ids + ) + map_left_id = {**lpublicmapping, **lhiddenmapping} + map_right_id = {**rpublicmapping, **rhiddenmapping} + joined_columns = [ - _mask_value(left._get_ibis_column(key), left_mask).name(map_left_id(key)) + _mask_value(left._get_ibis_column(key), left_mask).name(map_left_id[key]) for key in left.column_ids ] + [ - _mask_value(right._get_ibis_column(key), right_mask).name(map_right_id(key)) + _mask_value(right._get_ibis_column(key), right_mask).name(map_right_id[key]) for key in right.column_ids ] # If left isn't being masked, can just use left ordering if not left_mask: col_mapping = { - order_ref.column_id: map_left_id(order_ref.column_id) + order_ref.column_id: map_left_id[order_ref.column_id] for order_ref in left._ordering.ordering_value_columns } new_ordering = left._ordering.with_column_remap(col_mapping) else: ordering_columns = [ - col_ref.with_name(map_left_id(col_ref.column_id)) + col_ref.with_name(map_left_id[col_ref.column_id]) for col_ref in left._ordering.ordering_value_columns ] + [ - col_ref.with_name(map_right_id(col_ref.column_id)) + col_ref.with_name(map_right_id[col_ref.column_id]) for col_ref in right._ordering.ordering_value_columns ] left_total_order_cols = frozenset( - map_left_id(col) for col in left._ordering.total_ordering_columns + map_left_id[col] for col in left._ordering.total_ordering_columns ) # Assume that left ordering is sufficient since 1:1 join over same base table join_total_order_cols = left_total_order_cols @@ -95,12 +106,12 @@ def join_by_row_identity( ) hidden_ordering_columns = [ - left._get_hidden_ordering_column(key.column_id).name(map_left_id(key.column_id)) + left._get_hidden_ordering_column(key.column_id).name(map_left_id[key.column_id]) for key in left._ordering.ordering_value_columns if key.column_id in left._hidden_ordering_column_names.keys() ] + [ right._get_hidden_ordering_column(key.column_id).name( - map_right_id(key.column_id) + map_right_id[key.column_id] ) for key in right._ordering.ordering_value_columns if key.column_id in right._hidden_ordering_column_names.keys() @@ -114,18 +125,7 @@ def join_by_row_identity( ordering=new_ordering, predicates=combined_predicates, ) - return joined_expr, ( - lambda key: map_left_id(key), - lambda key: map_right_id(key), - ) - - -def map_left_id(left_side_id): - return f"{left_side_id}_x" - - -def map_right_id(right_side_id): - return f"{right_side_id}_y" + return joined_expr def _mask_value( diff --git a/bigframes/core/joins/single_column.py b/bigframes/core/joins/single_column.py index f194b8f8c4..0c0e2008b5 100644 --- a/bigframes/core/joins/single_column.py +++ b/bigframes/core/joins/single_column.py @@ -16,17 +16,15 @@ from __future__ import annotations -import itertools import typing -from typing import Callable, Literal, Tuple +from typing import Literal, Mapping import ibis import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types -import bigframes.constants as constants import bigframes.core as core -import bigframes.core.guid as guid +import bigframes.core.joins.name_resolution as naming import bigframes.core.joins.row_identity import bigframes.core.ordering @@ -43,13 +41,8 @@ def join_by_column( "outer", "right", ], - sort: bool = False, allow_row_identity_join: bool = True, -) -> Tuple[ - core.ArrayValue, - typing.Sequence[str], - Tuple[Callable[[str], str], Callable[[str], str]], -]: +) -> core.ArrayValue: """Join two expressions by column equality. Arguments: @@ -62,14 +55,9 @@ def join_by_column( If True, allow matching by row identity. Set to False to always perform a true JOIN in generated SQL. Returns: - The joined expression and the objects needed to interpret it. - - * ArrayValue: Joined table with all columns from left and right. - * Sequence[str]: Column IDs of the coalesced join columns. Sometimes either the - left/right table will have missing rows. This column pulls the - non-NULL value from either left/right. - * Tuple[Callable, Callable]: For a given column ID from left or right, - respectively, return the new column id from the combined expression. + The joined expression. The resulting columns will be, in order, + first the coalesced join keys, then, all the left columns, and + finally, all the right columns. """ if ( allow_row_identity_join @@ -85,71 +73,33 @@ def join_by_column( for lcol, rcol in zip(left_column_ids, right_column_ids) ) ): - combined_expr, ( - get_column_left, - get_column_right, - ) = bigframes.core.joins.row_identity.join_by_row_identity(left, right, how=how) - left_join_keys = [ - combined_expr._get_ibis_column(get_column_left(col)) - for col in left_column_ids - ] - right_join_keys = [ - combined_expr._get_ibis_column(get_column_right(col)) - for col in right_column_ids - ] - join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how) - join_key_ids = [col.get_name() for col in join_key_cols] - combined_expr = combined_expr._projection( - [*join_key_cols, *combined_expr.columns] - ) - if sort: - combined_expr = combined_expr.order_by( - [ - core.OrderingColumnReference(join_col_id) - for join_col_id in join_key_ids - ] - ) - return ( - combined_expr, - join_key_ids, - ( - get_column_left, - get_column_right, - ), + return bigframes.core.joins.row_identity.join_by_row_identity( + left, right, how=how ) else: - lmapping = { - col_id: guid.generate_guid() - for col_id in itertools.chain( - left.column_ids, left._hidden_ordering_column_names - ) - } - rmapping = { - col_id: guid.generate_guid() - for col_id in itertools.chain( - right.column_ids, right._hidden_ordering_column_names - ) - } - - def get_column_left(col_id): - return lmapping[col_id] - - def get_column_right(col_id): - return rmapping[col_id] + # Value column mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result + l_public_mapping, r_public_mapping = naming.JOIN_NAME_REMAPPER( + left.column_ids, right.column_ids + ) + l_hidden_mapping, r_hidden_mapping = naming.JoinNameRemapper( + namespace="hidden" + )(left._hidden_column_ids, right._hidden_column_ids) + l_mapping = {**l_public_mapping, **l_hidden_mapping} + r_mapping = {**r_public_mapping, **r_hidden_mapping} left_table = left._to_ibis_expr( "unordered", expose_hidden_cols=True, - col_id_overrides=lmapping, + col_id_overrides=l_mapping, ) right_table = right._to_ibis_expr( "unordered", expose_hidden_cols=True, - col_id_overrides=rmapping, + col_id_overrides=r_mapping, ) join_conditions = [ - value_to_join_key(left_table[lmapping[left_index]]) - == value_to_join_key(right_table[rmapping[right_index]]) + value_to_join_key(left_table[l_mapping[left_index]]) + == value_to_join_key(right_table[r_mapping[right_index]]) for left_index, right_index in zip(left_column_ids, right_column_ids) ] @@ -158,97 +108,39 @@ def get_column_right(col_id): right_table, predicates=join_conditions, how=how, - lname="{name}_x", - rname="{name}_y", ) # Preserve ordering accross joins. ordering = join_orderings( left._ordering, right._ordering, - get_column_left, - get_column_right, + l_mapping, + r_mapping, left_order_dominates=(how != "right"), ) - left_join_keys = [ - combined_table[get_column_left(col)] for col in left_column_ids - ] - right_join_keys = [ - combined_table[get_column_right(col)] for col in right_column_ids - ] - join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how) # We could filter out the original join columns, but predicates/ordering # might still reference them in implicit joins. - columns = ( - join_key_cols - + [combined_table[get_column_left(col.get_name())] for col in left.columns] - + [ - combined_table[get_column_right(col.get_name())] - for col in right.columns - ] - ) + columns = [ + combined_table[l_mapping[col.get_name()]] for col in left.columns + ] + [combined_table[r_mapping[col.get_name()]] for col in right.columns] hidden_ordering_columns = [ *[ - combined_table[get_column_left(col.get_name())] - for col in left.hidden_ordering_columns + combined_table[l_hidden_mapping[col.get_name()]] + for col in left._hidden_ordering_columns ], *[ - combined_table[get_column_right(col.get_name())] - for col in right.hidden_ordering_columns + combined_table[r_hidden_mapping[col.get_name()]] + for col in right._hidden_ordering_columns ], ] - combined_expr = core.ArrayValue( + return core.ArrayValue( left._session, combined_table, columns=columns, hidden_ordering_columns=hidden_ordering_columns, ordering=ordering, ) - if sort: - combined_expr = combined_expr.order_by( - [ - core.OrderingColumnReference(join_key_col.get_name()) - for join_key_col in join_key_cols - ] - ) - return ( - combined_expr, - [key.get_name() for key in join_key_cols], - (get_column_left, get_column_right), - ) - - -def get_coalesced_join_cols( - left_join_cols: typing.Iterable[ibis_types.Value], - right_join_cols: typing.Iterable[ibis_types.Value], - how: str, -) -> typing.List[ibis_types.Value]: - join_key_cols: list[ibis_types.Value] = [] - for left_col, right_col in zip(left_join_cols, right_join_cols): - if how == "left" or how == "inner": - join_key_cols.append(left_col.name(guid.generate_guid(prefix="index_"))) - elif how == "right": - join_key_cols.append(right_col.name(guid.generate_guid(prefix="index_"))) - elif how == "outer": - # The left index and the right index might contain null values, for - # example due to an outer join with different numbers of rows. Coalesce - # these to take the index value from either column. - # Use a random name in case the left index and the right index have the - # same name. In such a case, _x and _y suffixes will already be used. - # Don't need to coalesce if they are exactly the same column. - if left_col.name("index").equals(right_col.name("index")): - join_key_cols.append(left_col.name(guid.generate_guid(prefix="index_"))) - else: - join_key_cols.append( - ibis.coalesce( - left_col, - right_col, - ).name(guid.generate_guid(prefix="index_")) - ) - else: - raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}") - return join_key_cols def value_to_join_key(value: ibis_types.Value): @@ -261,16 +153,16 @@ def value_to_join_key(value: ibis_types.Value): def join_orderings( left: core.ExpressionOrdering, right: core.ExpressionOrdering, - left_id_mapping: Callable[[str], str], - right_id_mapping: Callable[[str], str], + left_id_mapping: Mapping[str, str], + right_id_mapping: Mapping[str, str], left_order_dominates: bool = True, ) -> core.ExpressionOrdering: left_ordering_refs = [ - ref.with_name(left_id_mapping(ref.column_id)) + ref.with_name(left_id_mapping[ref.column_id]) for ref in left.all_ordering_columns ] right_ordering_refs = [ - ref.with_name(right_id_mapping(ref.column_id)) + ref.with_name(right_id_mapping[ref.column_id]) for ref in right.all_ordering_columns ] if left_order_dominates: @@ -279,10 +171,10 @@ def join_orderings( joined_refs = [*right_ordering_refs, *left_ordering_refs] left_total_order_cols = frozenset( - [left_id_mapping(id) for id in left.total_ordering_columns] + [left_id_mapping[id] for id in left.total_ordering_columns] ) right_total_order_cols = frozenset( - [right_id_mapping(id) for id in right.total_ordering_columns] + [right_id_mapping[id] for id in right.total_ordering_columns] ) return core.ExpressionOrdering( ordering_value_columns=joined_refs, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 015a7642f8..5740d2c4dc 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -259,6 +259,10 @@ def iloc(self) -> indexers.ILocDataFrameIndexer: def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) + @property + def at(self) -> indexers.AtDataFrameIndexer: + return indexers.AtDataFrameIndexer(self) + @property def dtypes(self) -> pandas.Series: return pandas.Series(data=self._block.dtypes, index=self._block.column_labels) @@ -419,7 +423,7 @@ def _getitem_bool_series(self, key: bigframes.series.Series) -> DataFrame: get_column_right, ) = self._block.index.join(key._block.index, how="left") block = combined_index._block - filter_col_id = get_column_right(key._value_column) + filter_col_id = get_column_right[key._value_column] block = block.filter(filter_col_id) block = block.drop_columns([filter_col_id]) return DataFrame(block) @@ -560,18 +564,18 @@ def _apply_series_binop( ) series_column_id = other._value_column - series_col = get_column_right(series_column_id) + series_col = get_column_right[series_column_id] block = joined_index._block for column_id, label in zip( self._block.value_columns, self._block.column_labels ): block, _ = block.apply_binary_op( - get_column_left(column_id), + get_column_left[column_id], series_col, op, result_label=label, ) - block = block.drop_columns([get_column_left(column_id)]) + block = block.drop_columns([get_column_left[column_id]]) block = block.drop_columns([series_col]) block = block.with_index_labels(self.index.names) @@ -603,22 +607,22 @@ def _apply_dataframe_binop( left_col_id = self._block.value_columns[left_index] right_col_id = other._block.value_columns[right_index] block, result_col_id = block.apply_binary_op( - get_column_left(left_col_id), - get_column_right(right_col_id), + get_column_left[left_col_id], + get_column_right[right_col_id], op, ) binop_result_ids.append(result_col_id) elif left_index >= 0: left_col_id = self._block.value_columns[left_index] block, result_col_id = block.apply_unary_op( - get_column_left(left_col_id), + get_column_left[left_col_id], ops.partial_right(op, None), ) binop_result_ids.append(result_col_id) elif right_index >= 0: right_col_id = other._block.value_columns[right_index] block, result_col_id = block.apply_unary_op( - get_column_right(right_col_id), + get_column_right[right_col_id], ops.partial_left(op, None), ) binop_result_ids.append(result_col_id) @@ -974,7 +978,7 @@ def _drop_by_index(self, index: indexes.Index) -> DataFrame: block.index ) - new_ordering_col = get_column_right(ordering_col) + new_ordering_col = get_column_right[ordering_col] drop_block = joined_index._block drop_block, drop_col = drop_block.apply_unary_op( new_ordering_col, @@ -983,7 +987,7 @@ def _drop_by_index(self, index: indexes.Index) -> DataFrame: drop_block = drop_block.filter(drop_col) original_columns = [ - get_column_left(column) for column in self._block.value_columns + get_column_left[column] for column in self._block.value_columns ] drop_block = drop_block.select_columns(original_columns) return DataFrame(drop_block) @@ -1119,7 +1123,8 @@ def _assign_single_item( # local_df is likely (but not guarunteed) to be cached locally # since the original list came from memory and so is probably < MAX_INLINE_DF_SIZE - this_expr, this_offsets_col_id = self._get_block()._expr.promote_offsets() + this_offsets_col_id = bigframes.core.guid.generate_guid() + this_expr = self._get_block()._expr.promote_offsets(this_offsets_col_id) block = blocks.Block( expr=this_expr, index_labels=self.index.names, @@ -1156,10 +1161,10 @@ def _assign_series_join_on_index( ) column_ids = [ - get_column_left(col_id) for col_id in self._block.cols_matching_label(label) + get_column_left[col_id] for col_id in self._block.cols_matching_label(label) ] block = joined_index._block - source_column = get_column_right(series._value_column) + source_column = get_column_right[series._value_column] # Replace each column matching the label for column_id in column_ids: @@ -2032,8 +2037,8 @@ def _groupby_series( key._block.index, how="inner" if dropna else "left" ) col_ids = [ - *[get_column_left(value) for value in col_ids], - get_column_right(key._value_column), + *[get_column_left[value] for value in col_ids], + get_column_right[key._value_column], ] block = combined_index._block else: diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 46a7a1cb50..da221a95ac 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -19,6 +19,7 @@ from typing import Any, Dict, Iterable, Literal, Tuple, Union import geopandas as gpd # type: ignore +import google.cloud.bigquery as bigquery import ibis import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types @@ -27,6 +28,7 @@ import pyarrow as pa import bigframes.constants as constants +import third_party.bigframes_vendored.google_cloud_bigquery._pandas_helpers as gcb3p_pandas_helpers # Type hints for Pandas dtypes supported by BigQuery DataFrame Dtype = Union[ @@ -401,3 +403,18 @@ def cast_ibis_value( raise TypeError( f"Unsupported cast {value.type()} to {to_type}. {constants.FEEDBACK_LINK}" ) + + +def to_pandas_dtypes_overrides(schema: Iterable[bigquery.SchemaField]) -> Dict: + """For each STRUCT field, make sure we specify the full type to use.""" + # TODO(swast): Also override ARRAY fields. + dtypes = {} + for field in schema: + if field.field_type == "RECORD" and field.mode != "REPEATED": + # TODO(swast): We're using a private API here. Would likely be + # better if we called `to_arrow()` and converted to a pandas + # DataFrame ourselves from that. + dtypes[field.name] = pd.ArrowDtype( + gcb3p_pandas_helpers.bq_to_arrow_data_type(field) + ) + return dtypes diff --git a/bigframes/formatting_helpers.py b/bigframes/formatting_helpers.py index 82e2510e2a..6851bdd2bd 100644 --- a/bigframes/formatting_helpers.py +++ b/bigframes/formatting_helpers.py @@ -153,6 +153,14 @@ def wait_for_query_job( except api_core_exceptions.GoogleAPICallError as exc: add_feedback_link(exc) raise + except KeyboardInterrupt: + query_job.cancel() + print( + f"Requested cancellation for {query_job.job_type.capitalize()}" + f" job {query_job.job_id} in location {query_job.location}..." + ) + # begin the cancel request before immediately rethrowing + raise def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None): @@ -190,6 +198,14 @@ def wait_for_job(job: GenericJob, progress_bar: Optional[str] = None): except api_core_exceptions.GoogleAPICallError as exc: add_feedback_link(exc) raise + except KeyboardInterrupt: + job.cancel() + print( + f"Requested cancellation for {job.job_type.capitalize()}" + f" job {job.job_id} in location {job.location}..." + ) + # begin the cancel request before immediately rethrowing + raise def get_job_url(query_job: GenericJob): diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index a61dd34e6d..d78f467537 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -49,17 +49,17 @@ def __init__( connection_name: Optional[str] = None, ): self.session = session or bpd.get_global_session() + self._bq_connection_manager = clients.BqConnectionManager( + self.session.bqconnectionclient, self.session.resourcemanagerclient + ) connection_name = connection_name or self.session._bq_connection - self.connection_name = clients.get_connection_name_full( + self.connection_name = self._bq_connection_manager.resolve_full_connection_name( connection_name, default_project=self.session._project, default_location=self.session._location, ) - self._bq_connection_manager = clients.BqConnectionManager( - self.session.bqconnectionclient, self.session.resourcemanagerclient - ) self._bqml_model_factory = globals.bqml_model_factory() self._bqml_model: core.BqmlModel = self._create_bqml_model() @@ -188,17 +188,17 @@ def __init__( connection_name: Optional[str] = None, ): self.session = session or bpd.get_global_session() + self._bq_connection_manager = clients.BqConnectionManager( + self.session.bqconnectionclient, self.session.resourcemanagerclient + ) connection_name = connection_name or self.session._bq_connection - self.connection_name = clients.get_connection_name_full( + self.connection_name = self._bq_connection_manager.resolve_full_connection_name( connection_name, default_project=self.session._project, default_location=self.session._location, ) - self._bq_connection_manager = clients.BqConnectionManager( - self.session.bqconnectionclient, self.session.resourcemanagerclient - ) self._bqml_model_factory = globals.bqml_model_factory() self._bqml_model: core.BqmlModel = self._create_bqml_model() diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index f330a703b2..a29dd36c72 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -913,6 +913,16 @@ def ge_op( return x >= y +def coalesce_op( + x: ibis_types.Value, + y: ibis_types.Value, +): + if x.name("name").equals(y.name("name")): + return x + else: + return ibis.coalesce(x, y) + + @short_circuit_nulls(ibis_dtypes.int) def floordiv_op( x: ibis_types.Value, diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index fc76d07edb..b9abb2cc03 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -196,8 +196,8 @@ def _align_n( get_column_right, ) = block.index.join(other._block.index, how=how) value_ids = [ - *[get_column_left(value) for value in value_ids], - get_column_right(other._value_column), + *[get_column_left[value] for value in value_ids], + get_column_right[other._value_column], ] block = combined_index._block else: diff --git a/bigframes/operations/structs.py b/bigframes/operations/structs.py index 80d51115d0..506a557709 100644 --- a/bigframes/operations/structs.py +++ b/bigframes/operations/structs.py @@ -25,7 +25,7 @@ import third_party.bigframes_vendored.pandas.core.arrays.arrow.accessors as vendoracessors -class StructField(bigframes.operations.UnaryOp): +class _StructField(bigframes.operations.UnaryOp): def __init__(self, name_or_index: str | int): self._name_or_index = name_or_index @@ -44,7 +44,7 @@ class StructAccessor( __doc__ = vendoracessors.StructAccessor.__doc__ def field(self, name_or_index: str | int) -> bigframes.series.Series: - series = self._apply_unary_op(StructField(name_or_index)) + series = self._apply_unary_op(_StructField(name_or_index)) if isinstance(name_or_index, str): name = name_or_index else: diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index cb27834590..1b9144fb62 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -51,6 +51,7 @@ import bigframes.dataframe import bigframes.series import bigframes.session +import bigframes.session.clients import third_party.bigframes_vendored.pandas.core.reshape.concat as vendored_pandas_concat import third_party.bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge import third_party.bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile @@ -180,11 +181,12 @@ def _set_default_session_location_if_possible(query): ): return - clients_provider = bigframes.session.ClientsProvider( + clients_provider = bigframes.session.clients.ClientsProvider( project=options.bigquery.project, location=options.bigquery.location, use_regional_endpoints=options.bigquery.use_regional_endpoints, credentials=options.bigquery.credentials, + application_name=options.bigquery.application_name, ) bqclient = clients_provider.bqclient @@ -400,6 +402,7 @@ def remote_function( bigquery_connection: Optional[str] = None, reuse: bool = True, name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -409,6 +412,7 @@ def remote_function( bigquery_connection=bigquery_connection, reuse=reuse, name=name, + packages=packages, ) @@ -447,7 +451,7 @@ def read_gbq_function(function_name: str): # Session management APIs get_global_session = global_session.get_global_session -reset_session = global_session.reset_session +close_session = global_session.close_session # Use __all__ to let type checkers know what is part of the public API. @@ -478,5 +482,5 @@ def read_gbq_function(function_name: str): "options", # Session management APIs "get_global_session", - "reset_session", + "close_session", ] diff --git a/bigframes/remote_function.py b/bigframes/remote_function.py index 81ba26600b..c82ba84056 100644 --- a/bigframes/remote_function.py +++ b/bigframes/remote_function.py @@ -100,9 +100,12 @@ def get_remote_function_locations(bq_location): return bq_location, cloud_function_region -def _get_hash(def_): +def _get_hash(def_, package_requirements=None): "Get hash (32 digits alphanumeric) of a function." def_repr = cloudpickle.dumps(def_, protocol=_pickle_protocol_version) + if package_requirements: + for p in sorted(package_requirements): + def_repr += p.encode() return hashlib.md5(def_repr).hexdigest() @@ -129,18 +132,18 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name(def_, uniq_suffix=None): +def get_cloud_function_name(def_, uniq_suffix=None, package_requirements=None): "Get a name for the cloud function for the given user defined function." - cf_name = _get_hash(def_) + cf_name = _get_hash(def_, package_requirements) cf_name = f"bigframes-{cf_name}" # for identification if uniq_suffix: cf_name = f"{cf_name}-{uniq_suffix}" return cf_name -def get_remote_function_name(def_, uniq_suffix=None): +def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None): "Get a name for the BQ remote function for the given user defined function." - bq_rf_name = _get_hash(def_) + bq_rf_name = _get_hash(def_, package_requirements) bq_rf_name = f"bigframes_{bq_rf_name}" # for identification if uniq_suffix: bq_rf_name = f"{bq_rf_name}_{uniq_suffix}" @@ -200,7 +203,8 @@ def create_bq_remote_function( RETURNS {bq_function_return_type} REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}` OPTIONS ( - endpoint = "{endpoint}" + endpoint = "{endpoint}", + max_batching_rows = 1000 )""" logger.info(f"Creating BQ remote function: {create_function_ddl}") @@ -320,11 +324,14 @@ def {handler_func_name}(request): return handler_func_name - def generate_cloud_function_code(self, def_, dir): + def generate_cloud_function_code(self, def_, dir, package_requirements=None): """Generate the cloud function code for a given user defined function.""" # requirements.txt requirements = ["cloudpickle >= 2.1.0"] + if package_requirements: + requirements.extend(package_requirements) + requirements = sorted(requirements) requirements_txt = os.path.join(dir, "requirements.txt") with open(requirements_txt, "w") as f: f.write("\n".join(requirements)) @@ -333,12 +340,14 @@ def generate_cloud_function_code(self, def_, dir): entry_point = self.generate_cloud_function_main_code(def_, dir) return entry_point - def create_cloud_function(self, def_, cf_name): + def create_cloud_function(self, def_, cf_name, package_requirements=None): """Create a cloud function from the given user defined function.""" # Build and deploy folder structure containing cloud function with tempfile.TemporaryDirectory() as dir: - entry_point = self.generate_cloud_function_code(def_, dir) + entry_point = self.generate_cloud_function_code( + def_, dir, package_requirements + ) archive_path = shutil.make_archive(dir, "zip", dir) # We are creating cloud function source code from the currently running @@ -392,6 +401,9 @@ def create_cloud_function(self, def_, cf_name): function.build_config.source.storage_source.object_ = ( upload_url_response.storage_source.object_ ) + function.service_config = functions_v2.ServiceConfig() + function.service_config.available_memory = "1024M" + function.service_config.timeout_seconds = 600 create_function_request.function = function # Create the cloud function and wait for it to be ready to use @@ -422,6 +434,7 @@ def provision_bq_remote_function( output_type, reuse, name, + package_requirements, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -435,19 +448,25 @@ def provision_bq_remote_function( # Derive the name of the cloud function underlying the intended BQ # remote function - cloud_function_name = get_cloud_function_name(def_, uniq_suffix) + cloud_function_name = get_cloud_function_name( + def_, uniq_suffix, package_requirements + ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) # Create the cloud function if it does not exist if not cf_endpoint: - cf_endpoint = self.create_cloud_function(def_, cloud_function_name) + cf_endpoint = self.create_cloud_function( + def_, cloud_function_name, package_requirements + ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") # Derive the name of the remote function remote_function_name = name if not remote_function_name: - remote_function_name = get_remote_function_name(def_, uniq_suffix) + remote_function_name = get_remote_function_name( + def_, uniq_suffix, package_requirements + ) rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name) # Create the BQ remote function in following circumstances: @@ -619,6 +638,7 @@ def remote_function( bigquery_connection: Optional[str] = None, reuse: bool = True, name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -710,6 +730,10 @@ def remote_function( caution, because two users working in the same project and dataset could overwrite each other's remote functions if they use the same persistent name. + packages (str[], Optional): + Explicit name of the external package dependencies. Each dependency + is added to the `requirements.txt` as is, and can be of the form + supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. """ import bigframes.pandas as bpd @@ -772,7 +796,7 @@ def remote_function( if not bigquery_connection: bigquery_connection = session._bq_connection # type: ignore - bigquery_connection = clients.get_connection_name_full( + bigquery_connection = clients.BqConnectionManager.resolve_full_connection_name( bigquery_connection, default_project=dataset_ref.project, default_location=bq_location, @@ -821,6 +845,7 @@ def wrapper(f): ibis_signature.output_type, reuse, name, + packages, ) node = remote_function_node(dataset_ref.routine(rf_name), ibis_signature) diff --git a/bigframes/series.py b/bigframes/series.py index 56e1b43a03..84ca2a578f 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -91,6 +91,10 @@ def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) + @property + def at(self) -> bigframes.core.indexers.AtSeriesIndexer: + return bigframes.core.indexers.AtSeriesIndexer(self) + @property def name(self) -> blocks.Label: return self._name @@ -1136,10 +1140,10 @@ def _groupby_values( key._block.index, how="inner" if dropna else "left" ) - value_col = get_column_left(self._value_column) + value_col = get_column_left[self._value_column] grouping_cols = [ - *[get_column_left(value) for value in grouping_cols], - get_column_right(key._value_column), + *[get_column_left[value] for value in grouping_cols], + get_column_right[key._value_column], ] block = combined_index._block else: diff --git a/bigframes/session.py b/bigframes/session/__init__.py similarity index 89% rename from bigframes/session.py rename to bigframes/session/__init__.py index 6ad65000ce..1031fde9b5 100644 --- a/bigframes/session.py +++ b/bigframes/session/__init__.py @@ -61,7 +61,6 @@ ReadPickleBuffer, StorageOptions, ) -import pydata_google_auth import bigframes._config.bigquery_options as bigquery_options import bigframes.constants as constants @@ -75,6 +74,7 @@ import bigframes.formatting_helpers as formatting_helpers from bigframes.remote_function import read_gbq_function as bigframes_rgf from bigframes.remote_function import remote_function as bigframes_rf +import bigframes.session.clients import bigframes.version # Even though the ibis.backends.bigquery.registry import is unused, it's needed @@ -85,17 +85,7 @@ import third_party.bigframes_vendored.pandas.io.parsers.readers as third_party_pandas_readers import third_party.bigframes_vendored.pandas.io.pickle as third_party_pandas_pickle -_ENV_DEFAULT_PROJECT = "GOOGLE_CLOUD_PROJECT" -_APPLICATION_NAME = f"bigframes/{bigframes.version.__version__}" -_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] - -# BigQuery is a REST API, which requires the protocol as part of the URL. -_BIGQUERY_REGIONAL_ENDPOINT = "https://{location}-bigquery.googleapis.com" - -# BigQuery Connection and Storage are gRPC APIs, which don't support the -# https:// protocol in the API endpoint URL. -_BIGQUERYCONNECTION_REGIONAL_ENDPOINT = "{location}-bigqueryconnection.googleapis.com" -_BIGQUERYSTORAGE_REGIONAL_ENDPOINT = "{location}-bigquerystorage.googleapis.com" +_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection" _MAX_CLUSTER_COLUMNS = 4 @@ -120,149 +110,6 @@ def _is_query(query_or_table: str) -> bool: return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None -def _get_default_credentials_with_project(): - return pydata_google_auth.default(scopes=_SCOPES, use_local_webserver=False) - - -class ClientsProvider: - """Provides client instances necessary to perform cloud operations.""" - - def __init__( - self, - project: Optional[str], - location: Optional[str], - use_regional_endpoints: Optional[bool], - credentials: Optional[google.auth.credentials.Credentials], - ): - credentials_project = None - if credentials is None: - credentials, credentials_project = _get_default_credentials_with_project() - - # Prefer the project in this order: - # 1. Project explicitly specified by the user - # 2. Project set in the environment - # 3. Project associated with the default credentials - project = ( - project - or os.getenv(_ENV_DEFAULT_PROJECT) - or typing.cast(Optional[str], credentials_project) - ) - - if not project: - raise ValueError( - "Project must be set to initialize BigQuery client. " - "Try setting `bigframes.options.bigquery.project` first." - ) - - self._project = project - self._location = location - self._use_regional_endpoints = use_regional_endpoints - self._credentials = credentials - - # cloud clients initialized for lazy load - self._bqclient = None - self._bqconnectionclient = None - self._bqstorageclient = None - self._cloudfunctionsclient = None - self._resourcemanagerclient = None - - @property - def bqclient(self): - if not self._bqclient: - bq_options = None - if self._use_regional_endpoints: - bq_options = google.api_core.client_options.ClientOptions( - api_endpoint=_BIGQUERY_REGIONAL_ENDPOINT.format( - location=self._location - ), - ) - bq_info = google.api_core.client_info.ClientInfo( - user_agent=_APPLICATION_NAME - ) - self._bqclient = bigquery.Client( - client_info=bq_info, - client_options=bq_options, - credentials=self._credentials, - project=self._project, - location=self._location, - ) - - return self._bqclient - - @property - def bqconnectionclient(self): - if not self._bqconnectionclient: - bqconnection_options = None - if self._use_regional_endpoints: - bqconnection_options = google.api_core.client_options.ClientOptions( - api_endpoint=_BIGQUERYCONNECTION_REGIONAL_ENDPOINT.format( - location=self._location - ) - ) - bqconnection_info = google.api_core.gapic_v1.client_info.ClientInfo( - user_agent=_APPLICATION_NAME - ) - self._bqconnectionclient = ( - google.cloud.bigquery_connection_v1.ConnectionServiceClient( - client_info=bqconnection_info, - client_options=bqconnection_options, - credentials=self._credentials, - ) - ) - - return self._bqconnectionclient - - @property - def bqstorageclient(self): - if not self._bqstorageclient: - bqstorage_options = None - if self._use_regional_endpoints: - bqstorage_options = google.api_core.client_options.ClientOptions( - api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format( - location=self._location - ) - ) - bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo( - user_agent=_APPLICATION_NAME - ) - self._bqstorageclient = google.cloud.bigquery_storage_v1.BigQueryReadClient( - client_info=bqstorage_info, - client_options=bqstorage_options, - credentials=self._credentials, - ) - - return self._bqstorageclient - - @property - def cloudfunctionsclient(self): - if not self._cloudfunctionsclient: - functions_info = google.api_core.gapic_v1.client_info.ClientInfo( - user_agent=_APPLICATION_NAME - ) - self._cloudfunctionsclient = ( - google.cloud.functions_v2.FunctionServiceClient( - client_info=functions_info, - credentials=self._credentials, - ) - ) - - return self._cloudfunctionsclient - - @property - def resourcemanagerclient(self): - if not self._resourcemanagerclient: - resourcemanager_info = google.api_core.gapic_v1.client_info.ClientInfo( - user_agent=_APPLICATION_NAME - ) - self._resourcemanagerclient = ( - google.cloud.resourcemanager_v3.ProjectsClient( - credentials=self._credentials, client_info=resourcemanager_info - ) - ) - - return self._resourcemanagerclient - - class Session( third_party_pandas_gbq.GBQIOMixin, third_party_pandas_parquet.ParquetIOMixin, @@ -277,14 +124,14 @@ class Session( Configuration adjusting how to connect to BigQuery and related APIs. Note that some options are ignored if ``clients_provider`` is set. - clients_provider (bigframes.session.ClientsProvider): + clients_provider (bigframes.session.bigframes.session.clients.ClientsProvider): An object providing client library objects. """ def __init__( self, context: Optional[bigquery_options.BigQueryOptions] = None, - clients_provider: Optional[ClientsProvider] = None, + clients_provider: Optional[bigframes.session.clients.ClientsProvider] = None, ): if context is None: context = bigquery_options.BigQueryOptions() @@ -304,11 +151,12 @@ def __init__( if clients_provider: self._clients_provider = clients_provider else: - self._clients_provider = ClientsProvider( + self._clients_provider = bigframes.session.clients.ClientsProvider( project=context.project, location=self._location, use_regional_endpoints=context.use_regional_endpoints, credentials=context.credentials, + application_name=context.application_name, ) self._create_and_bind_bq_session() @@ -317,11 +165,11 @@ def __init__( ibis.bigquery.connect( project_id=context.project, client=self.bqclient, - storage_client=self.bqstorageclient, + storage_client=self.bqstoragereadclient, ), ) - self._bq_connection = context.bq_connection + self._bq_connection = context.bq_connection or _BIGFRAMES_DEFAULT_CONNECTION_ID # Now that we're starting the session, don't allow the options to be # changed. @@ -336,8 +184,8 @@ def bqconnectionclient(self): return self._clients_provider.bqconnectionclient @property - def bqstorageclient(self): - return self._clients_provider.bqstorageclient + def bqstoragereadclient(self): + return self._clients_provider.bqstoragereadclient @property def cloudfunctionsclient(self): @@ -496,6 +344,8 @@ def read_gbq_query( See also: :meth:`Session.read_gbq`. """ + # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so + # these docstrings are inline. return self._read_gbq_query( query=query, index_col=index_col, @@ -513,8 +363,6 @@ def _read_gbq_query( max_results: Optional[int] = None, api_name: str, ) -> dataframe.DataFrame: - # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so - # these docstrings are inline. if isinstance(index_col, str): index_cols = [index_col] else: @@ -559,6 +407,8 @@ def read_gbq_table( See also: :meth:`Session.read_gbq`. """ + # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so + # these docstrings are inline. return self._read_gbq_table( query=query, index_col=index_col, @@ -567,6 +417,62 @@ def read_gbq_table( api_name="read_gbq_table", ) + def _read_gbq_table_to_ibis_with_total_ordering( + self, + table_ref: bigquery.table.TableReference, + *, + api_name: str, + ) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: + """Create a read-only Ibis table expression representing a table. + + If we can get a total ordering from the table, such as via primary key + column(s), then return those too so that ordering generation can be + avoided. + """ + if table_ref.dataset_id.upper() == "_SESSION": + # _SESSION tables aren't supported by the tables.get REST API. + return ( + self.ibis_client.sql( + f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" + ), + None, + ) + + table_expression = self.ibis_client.table( + table_ref.table_id, + database=f"{table_ref.project}.{table_ref.dataset_id}", + ) + + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. + table = self.bqclient.get_table(table_ref) + + # TODO(b/305264153): Use public properties to fetch primary keys once + # added to google-cloud-bigquery. + primary_keys = ( + table._properties.get("tableConstraints", {}) + .get("primaryKey", {}) + .get("columns") + ) + + if not primary_keys: + return table_expression, None + else: + # Read from a snapshot since we won't have to copy the table data to create a total ordering. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + current_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + table_expression = self.ibis_client.sql( + bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + ) + return table_expression, primary_keys + def _read_gbq_table( self, query: str, @@ -579,24 +485,19 @@ def _read_gbq_table( if max_results and max_results <= 0: raise ValueError("`max_results` should be a positive number.") - # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so - # these docstrings are inline. # TODO(swast): Can we re-use the temp table from other reads in the # session, if the original table wasn't modified? table_ref = bigquery.table.TableReference.from_string( query, default_project=self.bqclient.project ) - if table_ref.dataset_id.upper() == "_SESSION": - # _SESSION tables aren't supported by the tables.get REST API. - table_expression = self.ibis_client.sql( - f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" - ) - else: - table_expression = self.ibis_client.table( - table_ref.table_id, - database=f"{table_ref.project}.{table_ref.dataset_id}", - ) + ( + table_expression, + total_ordering_cols, + ) = self._read_gbq_table_to_ibis_with_total_ordering( + table_ref, + api_name=api_name, + ) for key in col_order: if key not in table_expression.columns: @@ -622,7 +523,34 @@ def _read_gbq_table( ordering = None is_total_ordering = False - if len(index_cols) != 0: + if total_ordering_cols is not None: + # Note: currently, this a table has a total ordering only when the + # primary key(s) are set on a table. The query engine assumes such + # columns are unique, even if not enforced. + is_total_ordering = True + ordering = core.ExpressionOrdering( + ordering_value_columns=[ + core.OrderingColumnReference(column_id) + for column_id in total_ordering_cols + ], + total_ordering_columns=frozenset(total_ordering_cols), + ) + + if len(index_cols) != 0: + index_labels = typing.cast(List[Optional[str]], index_cols) + else: + # Use the total_ordering_cols to project offsets to use as the default index. + table_expression = table_expression.order_by(index_cols) + default_index_id = guid.generate_guid("bigframes_index_") + default_index_col = ( + ibis.row_number().cast(ibis_dtypes.int64).name(default_index_id) + ) + table_expression = table_expression.mutate( + **{default_index_id: default_index_col} + ) + index_cols = [default_index_id] + index_labels = [None] + elif len(index_cols) != 0: index_labels = typing.cast(List[Optional[str]], index_cols) distinct_table = table_expression.select(*index_cols).distinct() is_unique_sql = f"""WITH full_table AS ( @@ -1331,6 +1259,7 @@ def remote_function( bigquery_connection: Optional[str] = None, reuse: bool = True, name: Optional[str] = None, + packages: Optional[Sequence[str]] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1385,7 +1314,7 @@ def remote_function( Name of the BigQuery connection. You should either have the connection already created in the `location` you have chosen, or you should have the Project IAM Admin role to enable the service - to create the connection for you if you need it.If this parameter is + to create the connection for you if you need it. If this parameter is not provided then the BigQuery connection from the session is used. reuse (bool, Optional): Reuse the remote function if already exists. @@ -1400,6 +1329,10 @@ def remote_function( caution, because two users working in the same project and dataset could overwrite each other's remote functions if they use the same persistent name. + packages (str[], Optional): + Explicit name of the external package dependencies. Each dependency + is added to the `requirements.txt` as is, and can be of the form + supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1417,6 +1350,7 @@ def remote_function( bigquery_connection=bigquery_connection, reuse=reuse, name=name, + packages=packages, ) def read_gbq_function( diff --git a/bigframes/session/clients.py b/bigframes/session/clients.py new file mode 100644 index 0000000000..544f74265f --- /dev/null +++ b/bigframes/session/clients.py @@ -0,0 +1,196 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Clients manages the connection to Google APIs.""" + +import os +import typing +from typing import Optional + +import google.api_core.client_info +import google.api_core.client_options +import google.api_core.exceptions +import google.api_core.gapic_v1.client_info +import google.auth.credentials +import google.cloud.bigquery as bigquery +import google.cloud.bigquery_connection_v1 +import google.cloud.bigquery_storage_v1 +import google.cloud.functions_v2 +import google.cloud.resourcemanager_v3 +import pydata_google_auth + +import bigframes.version + +_ENV_DEFAULT_PROJECT = "GOOGLE_CLOUD_PROJECT" +_APPLICATION_NAME = f"bigframes/{bigframes.version.__version__}" +_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] + +# BigQuery is a REST API, which requires the protocol as part of the URL. +_BIGQUERY_REGIONAL_ENDPOINT = "https://{location}-bigquery.googleapis.com" + +# BigQuery Connection and Storage are gRPC APIs, which don't support the +# https:// protocol in the API endpoint URL. +_BIGQUERYCONNECTION_REGIONAL_ENDPOINT = "{location}-bigqueryconnection.googleapis.com" +_BIGQUERYSTORAGE_REGIONAL_ENDPOINT = "{location}-bigquerystorage.googleapis.com" + + +def _get_default_credentials_with_project(): + return pydata_google_auth.default(scopes=_SCOPES, use_local_webserver=False) + + +class ClientsProvider: + """Provides client instances necessary to perform cloud operations.""" + + def __init__( + self, + project: Optional[str], + location: Optional[str], + use_regional_endpoints: Optional[bool], + credentials: Optional[google.auth.credentials.Credentials], + application_name: Optional[str], + ): + credentials_project = None + if credentials is None: + credentials, credentials_project = _get_default_credentials_with_project() + + # Prefer the project in this order: + # 1. Project explicitly specified by the user + # 2. Project set in the environment + # 3. Project associated with the default credentials + project = ( + project + or os.getenv(_ENV_DEFAULT_PROJECT) + or typing.cast(Optional[str], credentials_project) + ) + + if not project: + raise ValueError( + "Project must be set to initialize BigQuery client. " + "Try setting `bigframes.options.bigquery.project` first." + ) + + self._application_name = ( + f"{_APPLICATION_NAME} {application_name}" + if application_name + else _APPLICATION_NAME + ) + self._project = project + self._location = location + self._use_regional_endpoints = use_regional_endpoints + self._credentials = credentials + + # cloud clients initialized for lazy load + self._bqclient = None + self._bqconnectionclient = None + self._bqstoragereadclient = None + self._cloudfunctionsclient = None + self._resourcemanagerclient = None + + @property + def bqclient(self): + if not self._bqclient: + bq_options = None + if self._use_regional_endpoints: + bq_options = google.api_core.client_options.ClientOptions( + api_endpoint=_BIGQUERY_REGIONAL_ENDPOINT.format( + location=self._location + ), + ) + bq_info = google.api_core.client_info.ClientInfo( + user_agent=self._application_name + ) + self._bqclient = bigquery.Client( + client_info=bq_info, + client_options=bq_options, + credentials=self._credentials, + project=self._project, + location=self._location, + ) + + return self._bqclient + + @property + def bqconnectionclient(self): + if not self._bqconnectionclient: + bqconnection_options = None + if self._use_regional_endpoints: + bqconnection_options = google.api_core.client_options.ClientOptions( + api_endpoint=_BIGQUERYCONNECTION_REGIONAL_ENDPOINT.format( + location=self._location + ) + ) + bqconnection_info = google.api_core.gapic_v1.client_info.ClientInfo( + user_agent=self._application_name + ) + self._bqconnectionclient = ( + google.cloud.bigquery_connection_v1.ConnectionServiceClient( + client_info=bqconnection_info, + client_options=bqconnection_options, + credentials=self._credentials, + ) + ) + + return self._bqconnectionclient + + @property + def bqstoragereadclient(self): + if not self._bqstoragereadclient: + bqstorage_options = None + if self._use_regional_endpoints: + bqstorage_options = google.api_core.client_options.ClientOptions( + api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format( + location=self._location + ) + ) + bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo( + user_agent=self._application_name + ) + self._bqstoragereadclient = ( + google.cloud.bigquery_storage_v1.BigQueryReadClient( + client_info=bqstorage_info, + client_options=bqstorage_options, + credentials=self._credentials, + ) + ) + + return self._bqstoragereadclient + + @property + def cloudfunctionsclient(self): + if not self._cloudfunctionsclient: + functions_info = google.api_core.gapic_v1.client_info.ClientInfo( + user_agent=self._application_name + ) + self._cloudfunctionsclient = ( + google.cloud.functions_v2.FunctionServiceClient( + client_info=functions_info, + credentials=self._credentials, + ) + ) + + return self._cloudfunctionsclient + + @property + def resourcemanagerclient(self): + if not self._resourcemanagerclient: + resourcemanager_info = google.api_core.gapic_v1.client_info.ClientInfo( + user_agent=self._application_name + ) + self._resourcemanagerclient = ( + google.cloud.resourcemanager_v3.ProjectsClient( + credentials=self._credentials, client_info=resourcemanager_info + ) + ) + + return self._resourcemanagerclient diff --git a/bigframes/version.py b/bigframes/version.py index 974fbf1ac9..3f7c8e4399 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.8.0" +__version__ = "0.9.0" diff --git a/docs/reference/bigframes.pandas/series.rst b/docs/reference/bigframes.pandas/series.rst index b179da9ca1..e212904f3f 100644 --- a/docs/reference/bigframes.pandas/series.rst +++ b/docs/reference/bigframes.pandas/series.rst @@ -34,3 +34,11 @@ String handling :members: :inherited-members: :undoc-members: + +Struct handling +^^^^^^^^^^^^^^^ + +.. automodule:: bigframes.operations.structs + :members: + :inherited-members: + :undoc-members: diff --git a/docs/templates/toc.yml b/docs/templates/toc.yml index 0758bb41d8..4fe2ec1a6a 100644 --- a/docs/templates/toc.yml +++ b/docs/templates/toc.yml @@ -39,6 +39,8 @@ uid: bigframes.operations.datetimes.DatetimeMethods - name: StringMethods uid: bigframes.operations.strings.StringMethods + - name: StructAccessor + uid: bigframes.operations.structs.StructAccessor name: Series - name: Window uid: bigframes.core.window.Window diff --git a/notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb b/notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb index 2e4ce3e510..0f113b84c6 100644 --- a/notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb +++ b/notebooks/generative_ai/bq_dataframes_llm_code_generation.ipynb @@ -354,7 +354,7 @@ "id": "DTVtFlqeFbrU" }, "source": [ - "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.reset_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." + "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.close_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." ] }, { diff --git a/notebooks/getting_started/getting_started_bq_dataframes.ipynb b/notebooks/getting_started/getting_started_bq_dataframes.ipynb index 6936e1cf59..6cc6acc993 100644 --- a/notebooks/getting_started/getting_started_bq_dataframes.ipynb +++ b/notebooks/getting_started/getting_started_bq_dataframes.ipynb @@ -383,7 +383,7 @@ "id": "pDfrKwMKE_dK" }, "source": [ - "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.reset_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." + "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.close_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." ] }, { diff --git a/notebooks/regression/bq_dataframes_ml_linear_regression.ipynb b/notebooks/regression/bq_dataframes_ml_linear_regression.ipynb index 338d6edf4f..675416f6ea 100644 --- a/notebooks/regression/bq_dataframes_ml_linear_regression.ipynb +++ b/notebooks/regression/bq_dataframes_ml_linear_regression.ipynb @@ -370,7 +370,7 @@ "id": "D21CoOlfFTYI" }, "source": [ - "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.reset_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." + "If you want to reset the location of the created DataFrame or Series objects, reset the session by executing `bf.close_session()`. After that, you can reuse `bf.options.bigquery.location` to specify another location." ] }, { diff --git a/noxfile.py b/noxfile.py index 84e5ab11bb..1864da9fe7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -89,7 +89,6 @@ "system", "doctest", "cover", - "release_dry_run", ] # Error if a python version is missing @@ -186,6 +185,7 @@ def run_unit(session, install_test_extra): # Run py.test against the unit tests. tests_path = os.path.join("tests", "unit") + third_party_tests_path = os.path.join("third_party", "bigframes_vendored") session.run( "py.test", "--quiet", @@ -197,6 +197,7 @@ def run_unit(session, install_test_extra): "--cov-report=term-missing", "--cov-fail-under=0", tests_path, + third_party_tests_path, *session.posargs, ) diff --git a/samples/snippets/quickstart_test.py b/samples/snippets/quickstart_test.py index 6b0c69de99..bbe4a8b3c4 100644 --- a/samples/snippets/quickstart_test.py +++ b/samples/snippets/quickstart_test.py @@ -23,7 +23,7 @@ def test_quickstart( capsys: pytest.CaptureFixture[str], ) -> None: # We need a fresh session since we're modifying connection options. - bigframes.pandas.reset_session() + bigframes.pandas.close_session() # TODO(swast): Get project from environment so contributors can run tests. quickstart.run_quickstart("bigframes-dev") diff --git a/samples/snippets/remote_function.py b/samples/snippets/remote_function.py index 9998a23eb2..646d7b0c30 100644 --- a/samples/snippets/remote_function.py +++ b/samples/snippets/remote_function.py @@ -89,19 +89,25 @@ def get_bucket(num): # say we consider the `species`, `island` and `sex` of the penguins # sensitive information and want to redact that by replacing with their hash # code instead. Let's define another scalar custom function and decorate it - # as a remote function + # as a remote function. The custom function in this example has external + # package dependency, which can be specified via `packages` parameter. @bpd.remote_function( - [str], str, bigquery_connection="bigframes-rf-conn", reuse=False + [str], + str, + bigquery_connection="bigframes-rf-conn", + reuse=False, + packages=["cryptography"], ) def get_hash(input): - import hashlib + from cryptography.fernet import Fernet # handle missing value if input is None: input = "" - encoded_input = input.encode() - hash = hashlib.md5(encoded_input) - return hash.hexdigest() + + key = Fernet.generate_key() + f = Fernet(key) + return f.encrypt(input.encode()).decode() # We can use this remote function in another `pandas`-like API `map` that # can be applied on a DataFrame diff --git a/samples/snippets/remote_function_test.py b/samples/snippets/remote_function_test.py index 8b51e46b45..e1317c6ac0 100644 --- a/samples/snippets/remote_function_test.py +++ b/samples/snippets/remote_function_test.py @@ -23,7 +23,7 @@ def test_remote_function_and_read_gbq_function( capsys: pytest.CaptureFixture[str], ) -> None: # We need a fresh session since we're modifying connection options. - bigframes.pandas.reset_session() + bigframes.pandas.close_session() # TODO(swast): Get project from environment so contributors can run tests. remote_function.run_remote_function_and_read_gbq_function("bigframes-dev") diff --git a/tests/system/conftest.py b/tests/system/conftest.py index ed22a3e8da..f36a29b0ab 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -17,6 +17,7 @@ import logging import math import pathlib +import textwrap import typing from typing import Dict, Optional @@ -795,6 +796,36 @@ def penguins_randomforest_classifier_model_name( return model_name +@pytest.fixture(scope="session") +def usa_names_grouped_table( + session: bigframes.Session, dataset_id_permanent +) -> bigquery.Table: + """Provides a table with primary key(s) set.""" + table_id = f"{dataset_id_permanent}.usa_names_grouped" + try: + return session.bqclient.get_table(table_id) + except google.cloud.exceptions.NotFound: + query = textwrap.dedent( + f""" + CREATE TABLE `{dataset_id_permanent}.usa_names_grouped` + ( + total_people INT64, + name STRING, + gender STRING, + year INT64, + PRIMARY KEY(name, gender, year) NOT ENFORCED + ) + AS + SELECT SUM(`number`) AS total_people, name, gender, year + FROM `bigquery-public-data.usa_names.usa_1910_2013` + GROUP BY name, gender, year + """ + ) + job = session.bqclient.query(query) + job.result() + return session.bqclient.get_table(table_id) + + @pytest.fixture() def deferred_repr(): bigframes.options.display.repr_mode = "deferred" diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index f270099182..730a1dbde4 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -916,6 +916,51 @@ def square(x): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_with_external_package_dependencies( + session, scalars_dfs, dataset_id, bq_cf_connection, functions_client +): + try: + + def pd_np_foo(x): + import numpy as mynp + import pandas as mypd + + return mypd.Series([x, mynp.sqrt(mynp.abs(x))]).sum() + + # Create the remote function with the name provided explicitly + pd_np_foo_remote = session.remote_function( + [int], + float, + dataset_id, + bq_cf_connection, + reuse=False, + packages=["numpy", "pandas >= 2.0.0"], + )(pd_np_foo) + + # The behavior of the created remote function should be as expected + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_too"] + bf_result_col = bf_int64_col.apply(pd_np_foo_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result_col = pd_int64_col.apply(pd_np_foo) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + + # pandas result is non-nullable type float64, make it Float64 before + # comparing for the purpose of this test + pd_result.result = pd_result.result.astype(pandas.Float64Dtype()) + + assert_pandas_df_equal_ignore_ordering(bf_result, pd_result) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, functions_client, pd_np_foo_remote + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_explicit_name_reuse( session, scalars_dfs, dataset_id, bq_cf_connection, functions_client diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index e546c09f97..b7257dde1b 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -30,7 +30,7 @@ def test_create_text_generator_model(palm2_text_generator_model): def test_create_text_generator_model_default_session(bq_connection, llm_text_pandas_df): import bigframes.pandas as bpd - bpd.reset_session() + bpd.close_session() bpd.options.bigquery.bq_connection = bq_connection bpd.options.bigquery.location = "us" @@ -53,7 +53,7 @@ def test_create_text_generator_model_default_connection(llm_text_pandas_df): from bigframes import _config import bigframes.pandas as bpd - bpd.reset_session() + bpd.close_session() _config.options = _config.Options() # reset configs llm_text_df = bpd.read_pandas(llm_text_pandas_df) @@ -130,7 +130,7 @@ def test_create_embedding_generator_model(palm2_embedding_generator_model): def test_create_text_embedding_generator_model_defaults(bq_connection): import bigframes.pandas as bpd - bpd.reset_session() + bpd.close_session() bpd.options.bigquery.bq_connection = bq_connection bpd.options.bigquery.location = "us" diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 9f1092d09d..309e8df4f0 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2196,6 +2196,29 @@ def test_loc_single_index_no_duplicate(scalars_df_index, scalars_pandas_df_index ) +def test_at_with_duplicate(scalars_df_index, scalars_pandas_df_index): + scalars_df_index = scalars_df_index.set_index("string_col", drop=False) + scalars_pandas_df_index = scalars_pandas_df_index.set_index( + "string_col", drop=False + ) + index = "Hello, World!" + bf_result = scalars_df_index.at[index, "int64_too"] + pd_result = scalars_pandas_df_index.at[index, "int64_too"] + pd.testing.assert_series_equal( + bf_result.to_pandas(), + pd_result, + ) + + +def test_at_no_duplicate(scalars_df_index, scalars_pandas_df_index): + scalars_df_index = scalars_df_index.set_index("int64_too", drop=False) + scalars_pandas_df_index = scalars_pandas_df_index.set_index("int64_too", drop=False) + index = -2345 + bf_result = scalars_df_index.at[index, "string_col"] + pd_result = scalars_pandas_df_index.at[index, "string_col"] + assert bf_result == pd_result + + def test_loc_setitem_bool_series_scalar_new_col(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs bf_df = scalars_df.copy() @@ -2764,6 +2787,22 @@ def test_loc_list_integer_index(scalars_df_index, scalars_pandas_df_index): ) +def test_loc_list_multiindex(scalars_df_index, scalars_pandas_df_index): + scalars_df_multiindex = scalars_df_index.set_index(["string_col", "int64_col"]) + scalars_pandas_df_multiindex = scalars_pandas_df_index.set_index( + ["string_col", "int64_col"] + ) + index_list = [("Hello, World!", -234892), ("Hello, World!", 123456789)] + + bf_result = scalars_df_multiindex.loc[index_list] + pd_result = scalars_pandas_df_multiindex.loc[index_list] + + pd.testing.assert_frame_equal( + bf_result.to_pandas(), + pd_result, + ) + + def test_iloc_list(scalars_df_index, scalars_pandas_df_index): index_list = [0, 0, 0, 5, 4, 7] @@ -2840,6 +2879,24 @@ def test_loc_bf_series_string_index(scalars_df_index, scalars_pandas_df_index): ) +def test_loc_bf_series_multiindex(scalars_df_index, scalars_pandas_df_index): + pd_string_series = scalars_pandas_df_index.string_col.iloc[[0, 5, 1, 1, 5]] + bf_string_series = scalars_df_index.string_col.iloc[[0, 5, 1, 1, 5]] + + scalars_df_multiindex = scalars_df_index.set_index(["string_col", "int64_col"]) + scalars_pandas_df_multiindex = scalars_pandas_df_index.set_index( + ["string_col", "int64_col"] + ) + + bf_result = scalars_df_multiindex.loc[bf_string_series] + pd_result = scalars_pandas_df_multiindex.loc[pd_string_series] + + pd.testing.assert_frame_equal( + bf_result.to_pandas(), + pd_result, + ) + + def test_loc_bf_index_integer_index(scalars_df_index, scalars_pandas_df_index): pd_index = scalars_pandas_df_index.iloc[[0, 5, 1, 1, 5]].index bf_index = scalars_df_index.iloc[[0, 5, 1, 1, 5]].index diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 3886b85f40..d60083a837 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -16,6 +16,7 @@ import google.api_core.exceptions import pandas as pd +import pyarrow as pa import pytest from tests.system.utils import ( @@ -44,7 +45,7 @@ def test_to_pandas_w_correct_dtypes(scalars_df_default_index): def test_to_pandas_array_struct_correct_result(session): - """In future, we should support arrays and structs with arrow types. + """In future, we should support arrays with arrow types. For now we fall back to the current connector behavior of converting to Python objects""" df = session.read_gbq( @@ -59,11 +60,27 @@ def test_to_pandas_array_struct_correct_result(session): expected = pd.DataFrame( { "array_column": [[1, 3, 2]], - "struct_column": [{"string_field": "a", "float_field": 1.2}], + "struct_column": pd.Series( + [{"string_field": "a", "float_field": 1.2}], + dtype=pd.ArrowDtype( + pa.struct( + [ + ("string_field", pa.string()), + ("float_field", pa.float64()), + ] + ) + ), + ), } ) expected.index = expected.index.astype("Int64") - pd.testing.assert_frame_equal(result, expected) + pd.testing.assert_series_equal(result.dtypes, expected.dtypes) + pd.testing.assert_series_equal(result["array_column"], expected["array_column"]) + # assert_series_equal not implemented for struct columns yet. Compare + # values as Python objects, instead. + pd.testing.assert_series_equal( + result["struct_column"].astype("O"), expected["struct_column"].astype("O") + ) @pytest.mark.parametrize( diff --git a/tests/system/small/test_pandas_options.py b/tests/system/small/test_pandas_options.py index 956b29ae12..ca67710d4e 100644 --- a/tests/system/small/test_pandas_options.py +++ b/tests/system/small/test_pandas_options.py @@ -26,7 +26,7 @@ @pytest.fixture(autouse=True) def reset_default_session_and_location(): - bpd.reset_session() + bpd.close_session() bpd.options.bigquery.location = None @@ -79,8 +79,8 @@ def test_read_gbq_start_sets_session_location( ): read_method(query) - # Reset global session to start over - bpd.reset_session() + # Close global session to start over + bpd.close_session() # There should still be the previous location set in the bigquery options assert bpd.options.bigquery.location == tokyo_location @@ -254,7 +254,7 @@ def test_read_gbq_must_comply_with_set_location_non_US( assert df is not None -def test_reset_session_after_credentials_need_reauthentication(monkeypatch): +def test_close_session_after_credentials_need_reauthentication(monkeypatch): # Use a simple test query to verify that default session works to interact # with BQ test_query = "SELECT 1" @@ -288,8 +288,8 @@ def test_reset_session_after_credentials_need_reauthentication(monkeypatch): with pytest.raises(google.auth.exceptions.RefreshError): bpd.read_gbq(test_query) - # Now verify that resetting the session works - bpd.reset_session() + # Now verify that closing the session works + bpd.close_session() assert bigframes.core.global_session._global_session is None # Now verify that use is able to start over diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 802425510a..bd9edbb1ca 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1953,6 +1953,16 @@ def test_series_iloc(scalars_df_index, scalars_pandas_df_index, start, stop, ste ) +def test_at(scalars_df_index, scalars_pandas_df_index): + scalars_df_index = scalars_df_index.set_index("int64_too", drop=False) + scalars_pandas_df_index = scalars_pandas_df_index.set_index("int64_too", drop=False) + index = -2345 + bf_result = scalars_df_index["string_col"].at[index] + pd_result = scalars_pandas_df_index["string_col"].at[index] + + assert bf_result == pd_result + + def test_iat(scalars_df_index, scalars_pandas_df_index): bf_result = scalars_df_index["int64_too"].iat[3] pd_result = scalars_pandas_df_index["int64_too"].iat[3] @@ -2614,6 +2624,22 @@ def test_loc_list_integer_index(scalars_df_index, scalars_pandas_df_index): ) +def test_loc_list_multiindex(scalars_df_index, scalars_pandas_df_index): + scalars_df_multiindex = scalars_df_index.set_index(["string_col", "int64_col"]) + scalars_pandas_df_multiindex = scalars_pandas_df_index.set_index( + ["string_col", "int64_col"] + ) + index_list = [("Hello, World!", -234892), ("Hello, World!", 123456789)] + + bf_result = scalars_df_multiindex.int64_too.loc[index_list] + pd_result = scalars_pandas_df_multiindex.int64_too.loc[index_list] + + pd.testing.assert_series_equal( + bf_result.to_pandas(), + pd_result, + ) + + def test_iloc_list(scalars_df_index, scalars_pandas_df_index): index_list = [0, 0, 0, 5, 4, 7] @@ -2671,6 +2697,24 @@ def test_loc_bf_series_string_index(scalars_df_index, scalars_pandas_df_index): ) +def test_loc_bf_series_multiindex(scalars_df_index, scalars_pandas_df_index): + pd_string_series = scalars_pandas_df_index.string_col.iloc[[0, 5, 1, 1, 5]] + bf_string_series = scalars_df_index.string_col.iloc[[0, 5, 1, 1, 5]] + + scalars_df_multiindex = scalars_df_index.set_index(["string_col", "int64_col"]) + scalars_pandas_df_multiindex = scalars_pandas_df_index.set_index( + ["string_col", "int64_col"] + ) + + bf_result = scalars_df_multiindex.int64_too.loc[bf_string_series] + pd_result = scalars_pandas_df_multiindex.int64_too.loc[pd_string_series] + + pd.testing.assert_series_equal( + bf_result.to_pandas(), + pd_result, + ) + + def test_loc_bf_index_integer_index(scalars_df_index, scalars_pandas_df_index): pd_index = scalars_pandas_df_index.iloc[[0, 5, 1, 1, 5]].index bf_index = scalars_df_index.iloc[[0, 5, 1, 1, 5]].index diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index bfe9bc8d0f..127a88a760 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -20,6 +20,7 @@ from typing import List import google.api_core.exceptions +import google.cloud.bigquery as bigquery import numpy as np import pandas as pd import pytest @@ -231,6 +232,30 @@ def test_read_gbq_w_anonymous_query_results_table(session: bigframes.Session): pd.testing.assert_frame_equal(result, expected, check_dtype=False) +def test_read_gbq_w_primary_keys_table( + session: bigframes.Session, usa_names_grouped_table: bigquery.Table +): + table = usa_names_grouped_table + # TODO(b/305264153): Use public properties to fetch primary keys once + # added to google-cloud-bigquery. + primary_keys = ( + table._properties.get("tableConstraints", {}) + .get("primaryKey", {}) + .get("columns") + ) + assert len(primary_keys) != 0 + + df = session.read_gbq(f"{table.project}.{table.dataset_id}.{table.table_id}") + result = df.head(100).to_pandas() + + # Verify that the DataFrame is already sorted by primary keys. + sorted_result = result.sort_values(primary_keys) + pd.testing.assert_frame_equal(result, sorted_result) + + # Verify that we're working from a snapshot rather than a copy of the table. + assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql + + @pytest.mark.parametrize( ("query_or_table", "max_results"), [ diff --git a/tests/unit/_config/test_bigquery_options.py b/tests/unit/_config/test_bigquery_options.py index aeee058319..e5b6cfe2f1 100644 --- a/tests/unit/_config/test_bigquery_options.py +++ b/tests/unit/_config/test_bigquery_options.py @@ -22,11 +22,13 @@ @pytest.mark.parametrize( ["attribute", "original_value", "new_value"], [ + ("application_name", None, "test-partner"), # For credentials, the match is by reference. ("credentials", object(), object()), ("location", "us-east1", "us-central1"), ("project", "my-project", "my-other-project"), ("bq_connection", "path/to/connection/1", "path/to/connection/2"), + ("use_regional_endpoints", False, True), ], ) def test_setter_raises_if_session_started(attribute, original_value, new_value): @@ -53,10 +55,12 @@ def test_setter_raises_if_session_started(attribute, original_value, new_value): [ (attribute,) for attribute in [ + "application_name", "credentials", "location", "project", "bq_connection", + "use_regional_endpoints", ] ], ) diff --git a/tests/unit/resources.py b/tests/unit/resources.py index c8ed6e86ed..0a68600a35 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -22,6 +22,7 @@ import bigframes import bigframes.core as core +import bigframes.session.clients """Utilities for creating test resources.""" @@ -37,7 +38,7 @@ def create_bigquery_session( bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" - clients_provider = mock.create_autospec(bigframes.session.ClientsProvider) + clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) clients_provider._credentials = credentials diff --git a/tests/unit/session/__init__.py b/tests/unit/session/__init__.py new file mode 100644 index 0000000000..1dc90d1848 --- /dev/null +++ b/tests/unit/session/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/unit/session/test_clients.py b/tests/unit/session/test_clients.py new file mode 100644 index 0000000000..f1b2a5045a --- /dev/null +++ b/tests/unit/session/test_clients.py @@ -0,0 +1,114 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional +import unittest.mock as mock + +import google.api_core.client_info +import google.api_core.client_options +import google.api_core.exceptions +import google.api_core.gapic_v1.client_info +import google.auth.credentials +import google.cloud.bigquery +import google.cloud.bigquery_connection_v1 +import google.cloud.bigquery_storage_v1 +import google.cloud.functions_v2 +import google.cloud.resourcemanager_v3 + +import bigframes.session.clients as clients +import bigframes.version + + +def create_clients_provider(application_name: Optional[str] = None): + credentials = mock.create_autospec(google.auth.credentials.Credentials) + return clients.ClientsProvider( + project="test-project", + location="test-region", + use_regional_endpoints=False, + credentials=credentials, + application_name=application_name, + ) + + +def monkeypatch_client_constructors(monkeypatch): + bqclient = mock.create_autospec(google.cloud.bigquery.Client) + bqclient.return_value = bqclient + monkeypatch.setattr(google.cloud.bigquery, "Client", bqclient) + + bqconnectionclient = mock.create_autospec( + google.cloud.bigquery_connection_v1.ConnectionServiceClient + ) + bqconnectionclient.return_value = bqconnectionclient + monkeypatch.setattr( + google.cloud.bigquery_connection_v1, + "ConnectionServiceClient", + bqconnectionclient, + ) + + bqstoragereadclient = mock.create_autospec( + google.cloud.bigquery_storage_v1.BigQueryReadClient + ) + bqstoragereadclient.return_value = bqstoragereadclient + monkeypatch.setattr( + google.cloud.bigquery_storage_v1, "BigQueryReadClient", bqstoragereadclient + ) + + cloudfunctionsclient = mock.create_autospec( + google.cloud.functions_v2.FunctionServiceClient + ) + cloudfunctionsclient.return_value = cloudfunctionsclient + monkeypatch.setattr( + google.cloud.functions_v2, "FunctionServiceClient", cloudfunctionsclient + ) + + resourcemanagerclient = mock.create_autospec( + google.cloud.resourcemanager_v3.ProjectsClient + ) + resourcemanagerclient.return_value = resourcemanagerclient + monkeypatch.setattr( + google.cloud.resourcemanager_v3, "ProjectsClient", resourcemanagerclient + ) + + +def assert_constructed_w_user_agent(mock_client: mock.Mock, expected_user_agent: str): + assert ( + expected_user_agent + in mock_client.call_args.kwargs["client_info"].to_user_agent() + ) + + +def assert_clients_w_user_agent( + provider: clients.ClientsProvider, expected_user_agent: str +): + assert_constructed_w_user_agent(provider.bqclient, expected_user_agent) + assert_constructed_w_user_agent(provider.bqconnectionclient, expected_user_agent) + assert_constructed_w_user_agent(provider.bqstoragereadclient, expected_user_agent) + assert_constructed_w_user_agent(provider.cloudfunctionsclient, expected_user_agent) + assert_constructed_w_user_agent(provider.resourcemanagerclient, expected_user_agent) + + +def test_user_agent_default(monkeypatch): + monkeypatch_client_constructors(monkeypatch) + provider = create_clients_provider(application_name=None) + assert_clients_w_user_agent(provider, f"bigframes/{bigframes.version.__version__}") + + +def test_user_agent_custom(monkeypatch): + monkeypatch_client_constructors(monkeypatch) + provider = create_clients_provider(application_name="(gpn:testpartner;)") + assert_clients_w_user_agent(provider, "(gpn:testpartner;)") + + # We still need to include attribution to bigframes, even if there's also a + # partner using the package. + assert_clients_w_user_agent(provider, f"bigframes/{bigframes.version.__version__}") diff --git a/tests/unit/test_session.py b/tests/unit/session/test_session.py similarity index 98% rename from tests/unit/test_session.py rename to tests/unit/session/test_session.py index e39a316e5b..18fd42e0f3 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/session/test_session.py @@ -20,7 +20,7 @@ import bigframes -from . import resources +from .. import resources @pytest.mark.parametrize("missing_parts_table_id", [(""), ("table")]) diff --git a/tests/unit/test_clients.py b/tests/unit/test_clients.py index a90e5b0320..f89cc21397 100644 --- a/tests/unit/test_clients.py +++ b/tests/unit/test_clients.py @@ -17,29 +17,22 @@ from bigframes import clients -def test_get_connection_name_full_none(): - connection_name = clients.get_connection_name_full( - None, default_project="default-project", default_location="us" - ) - assert connection_name == "default-project.us.bigframes-default-connection" - - def test_get_connection_name_full_connection_id(): - connection_name = clients.get_connection_name_full( + connection_name = clients.BqConnectionManager.resolve_full_connection_name( "connection-id", default_project="default-project", default_location="us" ) assert connection_name == "default-project.us.connection-id" def test_get_connection_name_full_location_connection_id(): - connection_name = clients.get_connection_name_full( + connection_name = clients.BqConnectionManager.resolve_full_connection_name( "eu.connection-id", default_project="default-project", default_location="us" ) assert connection_name == "default-project.eu.connection-id" def test_get_connection_name_full_all(): - connection_name = clients.get_connection_name_full( + connection_name = clients.BqConnectionManager.resolve_full_connection_name( "my-project.eu.connection-id", default_project="default-project", default_location="us", @@ -48,9 +41,8 @@ def test_get_connection_name_full_all(): def test_get_connection_name_full_raise_value_error(): - with pytest.raises(ValueError): - clients.get_connection_name_full( + clients.BqConnectionManager.resolve_full_connection_name( "my-project.eu.connection-id.extra_field", default_project="default-project", default_location="us", diff --git a/tests/unit/test_pandas.py b/tests/unit/test_pandas.py index 2325fc96a0..5d4f69c7c0 100644 --- a/tests/unit/test_pandas.py +++ b/tests/unit/test_pandas.py @@ -116,7 +116,7 @@ def test_pandas_attribute(): assert bpd.ArrowDtype is pd.ArrowDtype -def test_reset_session_after_bq_session_ended(monkeypatch): +def test_close_session_after_bq_session_ended(monkeypatch): bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" session = resources.create_bigquery_session( @@ -144,7 +144,7 @@ def test_reset_session_after_bq_session_ended(monkeypatch): bpd.read_gbq("SELECT 1") # Even though the query to stop the session raises an exception, we should - # still be able to reset it without raising an error to the user. - bpd.reset_session() + # still be able to close it without raising an error to the user. + bpd.close_session() assert "CALL BQ.ABORT_SESSION('JUST_A_TEST')" in bqclient.query.call_args.args[0] assert bigframes.core.global_session._global_session is None diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/LICENSE b/third_party/bigframes_vendored/google_cloud_bigquery/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/__init__.py b/third_party/bigframes_vendored/google_cloud_bigquery/__init__.py new file mode 100644 index 0000000000..1dc90d1848 --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/_pandas_helpers.py b/third_party/bigframes_vendored/google_cloud_bigquery/_pandas_helpers.py new file mode 100644 index 0000000000..5e2a7a7ef0 --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/_pandas_helpers.py @@ -0,0 +1,158 @@ +# Original: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/_pandas_helpers.py +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helper functions for connecting BigQuery and pandas.""" + +import warnings + +import google.cloud.bigquery.schema as schema +import pyarrow + + +def pyarrow_datetime(): + return pyarrow.timestamp("us", tz=None) + + +def pyarrow_numeric(): + return pyarrow.decimal128(38, 9) + + +def pyarrow_bignumeric(): + # 77th digit is partial. + # https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types + return pyarrow.decimal256(76, 38) + + +def pyarrow_time(): + return pyarrow.time64("us") + + +def pyarrow_timestamp(): + return pyarrow.timestamp("us", tz="UTC") + + +# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py +# When modifying it be sure to update it there as well. +BQ_TO_ARROW_SCALARS = { + "BOOL": pyarrow.bool_, + "BOOLEAN": pyarrow.bool_, + "BYTES": pyarrow.binary, + "DATE": pyarrow.date32, + "DATETIME": pyarrow_datetime, + "FLOAT": pyarrow.float64, + "FLOAT64": pyarrow.float64, + "GEOGRAPHY": pyarrow.string, + "INT64": pyarrow.int64, + "INTEGER": pyarrow.int64, + "NUMERIC": pyarrow_numeric, + "STRING": pyarrow.string, + "TIME": pyarrow_time, + "TIMESTAMP": pyarrow_timestamp, + "BIGNUMERIC": pyarrow_bignumeric, +} +ARROW_SCALAR_IDS_TO_BQ = { + # https://arrow.apache.org/docs/python/api/datatypes.html#type-classes + pyarrow.bool_().id: "BOOL", + pyarrow.int8().id: "INT64", + pyarrow.int16().id: "INT64", + pyarrow.int32().id: "INT64", + pyarrow.int64().id: "INT64", + pyarrow.uint8().id: "INT64", + pyarrow.uint16().id: "INT64", + pyarrow.uint32().id: "INT64", + pyarrow.uint64().id: "INT64", + pyarrow.float16().id: "FLOAT64", + pyarrow.float32().id: "FLOAT64", + pyarrow.float64().id: "FLOAT64", + pyarrow.time32("ms").id: "TIME", + pyarrow.time64("ns").id: "TIME", + pyarrow.timestamp("ns").id: "TIMESTAMP", + pyarrow.date32().id: "DATE", + pyarrow.date64().id: "DATETIME", # because millisecond resolution + pyarrow.binary().id: "BYTES", + pyarrow.string().id: "STRING", # also alias for pyarrow.utf8() + # The exact scale and precision don't matter. Only the type ID matters, + # and it's the same for all decimal128/decimal256 instances. + pyarrow.decimal128(38, scale=9).id: "NUMERIC", + pyarrow.decimal256(76, scale=38).id: "BIGNUMERIC", +} + + +BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = { + "GEOGRAPHY": { + b"ARROW:extension:name": b"google:sqlType:geography", + b"ARROW:extension:metadata": b'{"encoding": "WKT"}', + }, + "DATETIME": {b"ARROW:extension:name": b"google:sqlType:datetime"}, +} + + +def bq_to_arrow_struct_data_type(field): + arrow_fields = [] + for subfield in field.fields: + arrow_subfield = bq_to_arrow_field(subfield) + if arrow_subfield: + arrow_fields.append(arrow_subfield) + else: + # Could not determine a subfield type. Fallback to type + # inference. + return None + return pyarrow.struct(arrow_fields) + + +def bq_to_arrow_data_type(field): + """Return the Arrow data type, corresponding to a given BigQuery column. + + Returns: + None: if default Arrow type inspection should be used. + """ + if field.mode is not None and field.mode.upper() == "REPEATED": + inner_type = bq_to_arrow_data_type( + schema.SchemaField(field.name, field.field_type, fields=field.fields) + ) + if inner_type: + return pyarrow.list_(inner_type) + return None + + field_type_upper = field.field_type.upper() if field.field_type else "" + if field_type_upper in schema._STRUCT_TYPES: + return bq_to_arrow_struct_data_type(field) + + data_type_constructor = BQ_TO_ARROW_SCALARS.get(field_type_upper) + if data_type_constructor is None: + return None + return data_type_constructor() + + +def bq_to_arrow_field(bq_field, array_type=None): + """Return the Arrow field, corresponding to a given BigQuery column. + + Returns: + None: if the Arrow type cannot be determined. + """ + arrow_type = bq_to_arrow_data_type(bq_field) + if arrow_type is not None: + if array_type is not None: + arrow_type = array_type # For GEOGRAPHY, at least initially + is_nullable = bq_field.mode.upper() == "NULLABLE" + metadata = BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA.get( + bq_field.field_type.upper() if bq_field.field_type else "" + ) + return pyarrow.field( + bq_field.name, arrow_type, nullable=is_nullable, metadata=metadata + ) + + warnings.warn("Unable to determine type for field '{}'.".format(bq_field.name)) + return None diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/tests/__init__.py b/third_party/bigframes_vendored/google_cloud_bigquery/tests/__init__.py new file mode 100644 index 0000000000..1dc90d1848 --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/tests/unit/__init__.py b/third_party/bigframes_vendored/google_cloud_bigquery/tests/unit/__init__.py new file mode 100644 index 0000000000..1dc90d1848 --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/tests/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/tests/unit/test_pandas_helpers.py b/third_party/bigframes_vendored/google_cloud_bigquery/tests/unit/test_pandas_helpers.py new file mode 100644 index 0000000000..dc4a09cc54 --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/tests/unit/test_pandas_helpers.py @@ -0,0 +1,413 @@ +# Original: https://github.com/googleapis/python-bigquery/blob/main/tests/unit/test__pandas_helpers.py +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import warnings + +from google.cloud.bigquery import schema +import pyarrow +import pyarrow.parquet +import pyarrow.types +import pytest + + +@pytest.fixture +def module_under_test(): + from third_party.bigframes_vendored.google_cloud_bigquery import _pandas_helpers + + return _pandas_helpers + + +def is_none(value): + return value is None + + +def is_datetime(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz is None, + )(type_) + + +def is_numeric(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type + return all_( + pyarrow.types.is_decimal, + lambda type_: type_.precision == 38, + lambda type_: type_.scale == 9, + )(type_) + + +def is_bignumeric(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type + return all_( + pyarrow.types.is_decimal, + lambda type_: type_.precision == 76, + lambda type_: type_.scale == 38, + )(type_) + + +def is_timestamp(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz == "UTC", + )(type_) + + +def do_all(functions, value): + return all((func(value) for func in functions)) + + +def all_(*functions): + return functools.partial(do_all, functions) + + +def test_is_datetime(): + assert is_datetime(pyarrow.timestamp("us", tz=None)) + assert not is_datetime(pyarrow.timestamp("ms", tz=None)) + assert not is_datetime(pyarrow.timestamp("us", tz="UTC")) + assert not is_datetime(pyarrow.timestamp("ns", tz="UTC")) + assert not is_datetime(pyarrow.string()) + + +def test_do_all(): + assert do_all((lambda _: True, lambda _: True), None) + assert not do_all((lambda _: True, lambda _: False), None) + assert not do_all((lambda _: False,), None) + + +def test_all_(): + assert all_(lambda _: True, lambda _: True)(None) + assert not all_(lambda _: True, lambda _: False)(None) + + +@pytest.mark.parametrize( + "bq_type,bq_mode,is_correct_type", + [ + ("STRING", "NULLABLE", pyarrow.types.is_string), + ("STRING", None, pyarrow.types.is_string), + ("string", "NULLABLE", pyarrow.types.is_string), + ("StRiNg", "NULLABLE", pyarrow.types.is_string), + ("BYTES", "NULLABLE", pyarrow.types.is_binary), + ("INTEGER", "NULLABLE", pyarrow.types.is_int64), + ("INT64", "NULLABLE", pyarrow.types.is_int64), + ("FLOAT", "NULLABLE", pyarrow.types.is_float64), + ("FLOAT64", "NULLABLE", pyarrow.types.is_float64), + ("NUMERIC", "NULLABLE", is_numeric), + pytest.param( + "BIGNUMERIC", + "NULLABLE", + is_bignumeric, + ), + ("BOOLEAN", "NULLABLE", pyarrow.types.is_boolean), + ("BOOL", "NULLABLE", pyarrow.types.is_boolean), + ("TIMESTAMP", "NULLABLE", is_timestamp), + ("DATE", "NULLABLE", pyarrow.types.is_date32), + ("TIME", "NULLABLE", pyarrow.types.is_time64), + ("DATETIME", "NULLABLE", is_datetime), + ("GEOGRAPHY", "NULLABLE", pyarrow.types.is_string), + ("UNKNOWN_TYPE", "NULLABLE", is_none), + # Use pyarrow.list_(item_type) for repeated (array) fields. + ( + "STRING", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "STRING", + "repeated", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "STRING", + "RePeAtEd", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "BYTES", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_binary(type_.value_type), + ), + ), + ( + "INTEGER", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_int64(type_.value_type), + ), + ), + ( + "INT64", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_int64(type_.value_type), + ), + ), + ( + "FLOAT", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_float64(type_.value_type), + ), + ), + ( + "FLOAT64", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_float64(type_.value_type), + ), + ), + ( + "NUMERIC", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_numeric(type_.value_type)), + ), + pytest.param( + "BIGNUMERIC", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_bignumeric(type_.value_type)), + ), + ( + "BOOLEAN", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_boolean(type_.value_type), + ), + ), + ( + "BOOL", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_boolean(type_.value_type), + ), + ), + ( + "TIMESTAMP", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_timestamp(type_.value_type)), + ), + ( + "DATE", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_date32(type_.value_type), + ), + ), + ( + "TIME", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_time64(type_.value_type), + ), + ), + ( + "DATETIME", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_datetime(type_.value_type)), + ), + ( + "GEOGRAPHY", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ("RECORD", "REPEATED", is_none), + ("UNKNOWN_TYPE", "REPEATED", is_none), + ], +) +def test_bq_to_arrow_data_type(module_under_test, bq_type, bq_mode, is_correct_type): + field = schema.SchemaField("ignored_name", bq_type, mode=bq_mode) + actual = module_under_test.bq_to_arrow_data_type(field) + assert is_correct_type(actual) + + +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) +def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): + fields = ( + schema.SchemaField("field01", "STRING"), + schema.SchemaField("field02", "BYTES"), + schema.SchemaField("field03", "INTEGER"), + schema.SchemaField("field04", "INT64"), + schema.SchemaField("field05", "FLOAT"), + schema.SchemaField("field06", "FLOAT64"), + schema.SchemaField("field07", "NUMERIC"), + schema.SchemaField("field08", "BIGNUMERIC"), + schema.SchemaField("field09", "BOOLEAN"), + schema.SchemaField("field10", "BOOL"), + schema.SchemaField("field11", "TIMESTAMP"), + schema.SchemaField("field12", "DATE"), + schema.SchemaField("field13", "TIME"), + schema.SchemaField("field14", "DATETIME"), + schema.SchemaField("field15", "GEOGRAPHY"), + ) + + field = schema.SchemaField("ignored_name", bq_type, mode="NULLABLE", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + + expected = ( + pyarrow.field("field01", pyarrow.string()), + pyarrow.field("field02", pyarrow.binary()), + pyarrow.field("field03", pyarrow.int64()), + pyarrow.field("field04", pyarrow.int64()), + pyarrow.field("field05", pyarrow.float64()), + pyarrow.field("field06", pyarrow.float64()), + pyarrow.field("field07", module_under_test.pyarrow_numeric()), + pyarrow.field("field08", module_under_test.pyarrow_bignumeric()), + pyarrow.field("field09", pyarrow.bool_()), + pyarrow.field("field10", pyarrow.bool_()), + pyarrow.field("field11", module_under_test.pyarrow_timestamp()), + pyarrow.field("field12", pyarrow.date32()), + pyarrow.field("field13", module_under_test.pyarrow_time()), + pyarrow.field("field14", module_under_test.pyarrow_datetime()), + pyarrow.field("field15", pyarrow.string()), + ) + expected = pyarrow.struct(expected) + + assert pyarrow.types.is_struct(actual) + assert actual.num_fields == len(fields) + assert actual.equals(expected) + + +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) +def test_bq_to_arrow_data_type_w_array_struct(module_under_test, bq_type): + fields = ( + schema.SchemaField("field01", "STRING"), + schema.SchemaField("field02", "BYTES"), + schema.SchemaField("field03", "INTEGER"), + schema.SchemaField("field04", "INT64"), + schema.SchemaField("field05", "FLOAT"), + schema.SchemaField("field06", "FLOAT64"), + schema.SchemaField("field07", "NUMERIC"), + schema.SchemaField("field08", "BIGNUMERIC"), + schema.SchemaField("field09", "BOOLEAN"), + schema.SchemaField("field10", "BOOL"), + schema.SchemaField("field11", "TIMESTAMP"), + schema.SchemaField("field12", "DATE"), + schema.SchemaField("field13", "TIME"), + schema.SchemaField("field14", "DATETIME"), + schema.SchemaField("field15", "GEOGRAPHY"), + ) + + field = schema.SchemaField("ignored_name", bq_type, mode="REPEATED", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + + expected = ( + pyarrow.field("field01", pyarrow.string()), + pyarrow.field("field02", pyarrow.binary()), + pyarrow.field("field03", pyarrow.int64()), + pyarrow.field("field04", pyarrow.int64()), + pyarrow.field("field05", pyarrow.float64()), + pyarrow.field("field06", pyarrow.float64()), + pyarrow.field("field07", module_under_test.pyarrow_numeric()), + pyarrow.field("field08", module_under_test.pyarrow_bignumeric()), + pyarrow.field("field09", pyarrow.bool_()), + pyarrow.field("field10", pyarrow.bool_()), + pyarrow.field("field11", module_under_test.pyarrow_timestamp()), + pyarrow.field("field12", pyarrow.date32()), + pyarrow.field("field13", module_under_test.pyarrow_time()), + pyarrow.field("field14", module_under_test.pyarrow_datetime()), + pyarrow.field("field15", pyarrow.string()), + ) + expected_value_type = pyarrow.struct(expected) + + assert pyarrow.types.is_list(actual) + assert pyarrow.types.is_struct(actual.value_type) + assert actual.value_type.num_fields == len(fields) + assert actual.value_type.equals(expected_value_type) + + +def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test): + fields = ( + schema.SchemaField("field1", "STRING"), + schema.SchemaField("field2", "INTEGER"), + # Don't know what to convert UNKNOWN_TYPE to, let type inference work, + # instead. + schema.SchemaField("field3", "UNKNOWN_TYPE"), + ) + field = schema.SchemaField("ignored_name", "RECORD", mode="NULLABLE", fields=fields) + + with warnings.catch_warnings(record=True) as warned: + actual = module_under_test.bq_to_arrow_data_type(field) + + assert actual is None + assert len(warned) == 1 + warning = warned[0] + assert "field3" in str(warning) + + +def test_bq_to_arrow_field_type_override(module_under_test): + # When loading pandas data, we may need to override the type + # decision based on data contents, because GEOGRAPHY data can be + # stored as either text or binary. + + assert ( + module_under_test.bq_to_arrow_field(schema.SchemaField("g", "GEOGRAPHY")).type + == pyarrow.string() + ) + + assert ( + module_under_test.bq_to_arrow_field( + schema.SchemaField("g", "GEOGRAPHY"), + pyarrow.binary(), + ).type + == pyarrow.binary() + ) + + +@pytest.mark.parametrize( + "field_type, metadata", + [ + ("datetime", {b"ARROW:extension:name": b"google:sqlType:datetime"}), + ( + "geography", + { + b"ARROW:extension:name": b"google:sqlType:geography", + b"ARROW:extension:metadata": b'{"encoding": "WKT"}', + }, + ), + ], +) +def test_bq_to_arrow_field_metadata(module_under_test, field_type, metadata): + assert ( + module_under_test.bq_to_arrow_field( + schema.SchemaField("g", field_type) + ).metadata + == metadata + ) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index e54f984d59..621d052cb8 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -2112,3 +2112,8 @@ def iloc(self): def iat(self): """Access a single value for a row/column pair by integer position.""" raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def at(self): + """Access a single value for a row/column label pair.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 03729922d5..bd1f9a9a18 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -22,6 +22,23 @@ class Series(NDFrame): # type: ignore[misc] def dt(self): """ Accessor object for datetime-like properties of the Series values. + + Returns: + bigframes.operations.datetimes.DatetimeMethods: + An accessor containing datetime methods. + + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def struct(self): + """ + Accessor object for struct properties of the Series values. + + Returns: + bigframes.operations.structs.StructAccessor: + An accessor containing struct methods. + """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1733,6 +1750,10 @@ def str(self): NAs stay NA unless handled otherwise by a particular method. Patterned after Python’s string methods, with some inspiration from R’s stringr package. + + Returns: + bigframes.operations.strings.StringMethods: + An accessor containing string methods. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1833,3 +1854,8 @@ def iloc(self): def iat(self): """Access a single value for a row/column pair by integer position.""" raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + @property + def at(self): + """Access a single value for a row/column label pair.""" + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)