diff --git a/snippets/auth/index.py b/snippets/auth/index.py index b1c091064..428c54e09 100644 --- a/snippets/auth/index.py +++ b/snippets/auth/index.py @@ -25,6 +25,7 @@ from firebase_admin import credentials from firebase_admin import auth from firebase_admin import exceptions +from firebase_admin import tenant_mgt sys.path.append("lib") @@ -634,6 +635,418 @@ def send_custom_email(email, link): del email del link +def create_saml_provider_config(): + # [START create_saml_provider] + saml = auth.create_saml_provider_config( + display_name='SAML provider name', + enabled=True, + provider_id='saml.myProvider', + idp_entity_id='IDP_ENTITY_ID', + sso_url='https://example.com/saml/sso/1234/', + x509_certificates=[ + '-----BEGIN CERTIFICATE-----\nCERT1...\n-----END CERTIFICATE-----', + '-----BEGIN CERTIFICATE-----\nCERT2...\n-----END CERTIFICATE-----', + ], + rp_entity_id='P_ENTITY_ID', + callback_url='https://project-id.firebaseapp.com/__/auth/handler') + + print('Created new SAML provider:', saml.provider_id) + # [END create_saml_provider] + +def update_saml_provider_config(): + # [START update_saml_provider] + saml = auth.update_saml_provider_config( + 'saml.myProvider', + x509_certificates=[ + '-----BEGIN CERTIFICATE-----\nCERT2...\n-----END CERTIFICATE-----', + '-----BEGIN CERTIFICATE-----\nCERT3...\n-----END CERTIFICATE-----', + ]) + + print('Updated SAML provider:', saml.provider_id) + # [END update_saml_provider] + +def get_saml_provider_config(): + # [START get_saml_provider] + saml = auth.get_saml_provider_config('saml.myProvider') + print(saml.display_name, saml.enabled) + # [END get_saml_provider] + +def delete_saml_provider_config(): + # [START delete_saml_provider] + auth.delete_saml_provider_config('saml.myProvider') + # [END delete_saml_provider] + +def list_saml_provider_configs(): + # [START list_saml_providers] + for saml in auth.list_saml_provider_configs('nextPageToken').iterate_all(): + print(saml.provider_id) + # [END list_saml_providers] + +def create_oidc_provider_config(): + # [START create_oidc_provider] + oidc = auth.create_oidc_provider_config( + display_name='OIDC provider name', + enabled=True, + provider_id='oidc.myProvider', + client_id='CLIENT_ID2', + issuer='https://oidc.com/CLIENT_ID2') + + print('Created new OIDC provider:', oidc.provider_id) + # [END create_oidc_provider] + +def update_oidc_provider_config(): + # [START update_oidc_provider] + oidc = auth.update_oidc_provider_config( + 'oidc.myProvider', + client_id='CLIENT_ID', + issuer='https://oidc.com') + + print('Updated OIDC provider:', oidc.provider_id) + # [END update_oidc_provider] + +def get_oidc_provider_config(): + # [START get_oidc_provider] + oidc = auth.get_oidc_provider_config('oidc.myProvider') + + print(oidc.display_name, oidc.enabled) + # [END get_oidc_provider] + +def delete_oidc_provider_config(): + # [START delete_oidc_provider] + auth.delete_oidc_provider_config('oidc.myProvider') + # [END delete_oidc_provider] + +def list_oidc_provider_configs(): + # [START list_oidc_providers] + for oidc in auth.list_oidc_provider_configs('nextPageToken').iterate_all(): + print(oidc.provider_id) + # [END list_oidc_providers] + +def get_tenant_client(tenant_id): + # [START get_tenant_client] + from firebase_admin import tenant_mgt + + tenant_client = tenant_mgt.auth_for_tenant(tenant_id) + # [END get_tenant_client] + return tenant_client + +def get_tenant(tenant_id): + # [START get_tenant] + tenant = tenant_mgt.get_tenant(tenant_id) + + print('Retreieved tenant:', tenant.tenant_id) + # [END get_tenant] + +def create_tenant(): + # [START create_tenant] + tenant = tenant_mgt.create_tenant( + display_name='myTenant1', + enable_email_link_sign_in=True, + allow_password_sign_up=True) + + print('Created tenant:', tenant.tenant_id) + # [END create_tenant] + +def update_tenant(tenant_id): + # [START update_tenant] + tenant = tenant_mgt.update_tenant( + tenant_id, + display_name='updatedName', + allow_password_sign_up=False) # Disable email provider + + print('Updated tenant:', tenant.tenant_id) + # [END update_tenant] + +def delete_tenant(tenant_id): + # [START delete_tenant] + tenant_mgt.delete_tenant(tenant_id) + # [END delete_tenant] + +def list_tenants(): + # [START list_tenants] + for tenant in tenant_mgt.list_tenants().iterate_all(): + print('Retrieved tenant:', tenant.tenant_id) + # [END list_tenants] + +def create_provider_tenant(): + # [START get_tenant_client_short] + tenant_client = tenant_mgt.auth_for_tenant('TENANT-ID') + # [END get_tenant_client_short] + + # [START create_saml_provider_tenant] + saml = tenant_client.create_saml_provider_config( + display_name='SAML provider name', + enabled=True, + provider_id='saml.myProvider', + idp_entity_id='IDP_ENTITY_ID', + sso_url='https://example.com/saml/sso/1234/', + x509_certificates=[ + '-----BEGIN CERTIFICATE-----\nCERT1...\n-----END CERTIFICATE-----', + '-----BEGIN CERTIFICATE-----\nCERT2...\n-----END CERTIFICATE-----', + ], + rp_entity_id='P_ENTITY_ID', + callback_url='https://project-id.firebaseapp.com/__/auth/handler') + + print('Created new SAML provider:', saml.provider_id) + # [END create_saml_provider_tenant] + +def update_provider_tenant(tenant_client): + # [START update_saml_provider_tenant] + saml = tenant_client.update_saml_provider_config( + 'saml.myProvider', + x509_certificates=[ + '-----BEGIN CERTIFICATE-----\nCERT2...\n-----END CERTIFICATE-----', + '-----BEGIN CERTIFICATE-----\nCERT3...\n-----END CERTIFICATE-----', + ]) + + print('Updated SAML provider:', saml.provider_id) + # [END update_saml_provider_tenant] + +def get_provider_tenant(tennat_client): + # [START get_saml_provider_tenant] + saml = tennat_client.get_saml_provider_config('saml.myProvider') + print(saml.display_name, saml.enabled) + # [END get_saml_provider_tenant] + +def list_provider_configs_tenant(tenant_client): + # [START list_saml_providers_tenant] + for saml in tenant_client.list_saml_provider_configs('nextPageToken').iterate_all(): + print(saml.provider_id) + # [END list_saml_providers_tenant] + +def delete_provider_config_tenant(tenant_client): + # [START delete_saml_provider_tenant] + tenant_client.delete_saml_provider_config('saml.myProvider') + # [END delete_saml_provider_tenant] + +def get_user_tenant(tenant_client): + uid = 'some_string_uid' + + # [START get_user_tenant] + # Get an auth.Client from tenant_mgt.auth_for_tenant() + user = tenant_client.get_user(uid) + print('Successfully fetched user data:', user.uid) + # [END get_user_tenant] + +def get_user_by_email_tenant(tenant_client): + email = 'some@email.com' + # [START get_user_by_email_tenant] + user = tenant_client.get_user_by_email(email) + print('Successfully fetched user data:', user.uid) + # [END get_user_by_email_tenant] + +def create_user_tenant(tenant_client): + # [START create_user_tenant] + user = tenant_client.create_user( + email='user@example.com', + email_verified=False, + phone_number='+15555550100', + password='secretPassword', + display_name='John Doe', + photo_url='http://www.example.com/12345678/photo.png', + disabled=False) + print('Sucessfully created new user:', user.uid) + # [END create_user_tenant] + +def update_user_tenant(tenant_client, uid): + # [START update_user_tenant] + user = tenant_client.update_user( + uid, + email='user@example.com', + phone_number='+15555550100', + email_verified=True, + password='newPassword', + display_name='John Doe', + photo_url='http://www.example.com/12345678/photo.png', + disabled=True) + print('Sucessfully updated user:', user.uid) + # [END update_user_tenant] + +def delete_user_tenant(tenant_client, uid): + # [START delete_user_tenant] + tenant_client.delete_user(uid) + print('Successfully deleted user') + # [END delete_user_tenant] + +def list_users_tenant(tenant_client): + # [START list_all_users_tenant] + # Note, behind the scenes, the iterator will retrive 1000 users at a time through the API + for user in tenant_client.list_users().iterate_all(): + print('User: ' + user.uid) + + # Iterating by pages of 1000 users at a time. + page = tenant_client.list_users() + while page: + for user in page.users: + print('User: ' + user.uid) + # Get next batch of users. + page = page.get_next_page() + # [END list_all_users_tenant] + +def import_with_hmac_tenant(tenant_client): + # [START import_with_hmac_tenant] + users = [ + auth.ImportUserRecord( + uid='uid1', + email='user1@example.com', + password_hash=b'password_hash_1', + password_salt=b'salt1' + ), + auth.ImportUserRecord( + uid='uid2', + email='user2@example.com', + password_hash=b'password_hash_2', + password_salt=b'salt2' + ), + ] + + hash_alg = auth.UserImportHash.hmac_sha256(key=b'secret') + try: + result = tenant_client.import_users(users, hash_alg=hash_alg) + for err in result.errors: + print('Failed to import user:', err.reason) + except exceptions.FirebaseError as error: + print('Error importing users:', error) + # [END import_with_hmac_tenant] + +def import_without_password_tenant(tenant_client): + # [START import_without_password_tenant] + users = [ + auth.ImportUserRecord( + uid='some-uid', + display_name='John Doe', + email='johndoe@gmail.com', + photo_url='http://www.example.com/12345678/photo.png', + email_verified=True, + phone_number='+11234567890', + custom_claims={'admin': True}, # set this user as admin + provider_data=[ # user with SAML provider + auth.UserProvider( + uid='saml-uid', + email='johndoe@gmail.com', + display_name='John Doe', + photo_url='http://www.example.com/12345678/photo.png', + provider_id='saml.acme' + ) + ], + ), + ] + try: + result = tenant_client.import_users(users) + for err in result.errors: + print('Failed to import user:', err.reason) + except exceptions.FirebaseError as error: + print('Error importing users:', error) + # [END import_without_password_tenant] + +def verify_id_token_tenant(tenant_client, id_token): + # [START verify_id_token_tenant] + # id_token comes from the client app + try: + decoded_token = tenant_client.verify_id_token(id_token) + + # This should be set to TENANT-ID. Otherwise TenantIdMismatchError error raised. + print('Verified ID token from tenant:', decoded_token['firebase']['tenant']) + except tenant_mgt.TenantIdMismatchError: + # Token revoked, inform the user to reauthenticate or signOut(). + pass + # [END verify_id_token_tenant] + +def verify_id_token_access_control_tenant(id_token): + # [START id_token_access_control_tenant] + decoded_token = auth.verify_id_token(id_token) + + tenant = decoded_token['firebase']['tenant'] + if tenant == 'TENANT-ID1': + # Allow appropriate level of access for TENANT-ID1. + pass + elif tenant == 'TENANT-ID2': + # Allow appropriate level of access for TENANT-ID2. + pass + else: + # Access not allowed -- Handle error + pass + # [END id_token_access_control_tenant] + +def revoke_refresh_tokens_tenant(tenant_client, uid): + # [START revoke_tokens_tenant] + # Revoke all refresh tokens for a specified user in a specified tenant for whatever reason. + # Retrieve the timestamp of the revocation, in seconds since the epoch. + tenant_client.revoke_refresh_tokens(uid) + + user = tenant_client.get_user(uid) + # Convert to seconds as the auth_time in the token claims is in seconds. + revocation_second = user.tokens_valid_after_timestamp / 1000 + print('Tokens revoked at: {0}'.format(revocation_second)) + # [END revoke_tokens_tenant] + +def verify_id_token_and_check_revoked_tenant(tenant_client, id_token): + # [START verify_id_token_and_check_revoked_tenant] + # Verify the ID token for a specific tenant while checking if the token is revoked. + try: + # Verify the ID token while checking if the token is revoked by + # passing check_revoked=True. + decoded_token = tenant_client.verify_id_token(id_token, check_revoked=True) + # Token is valid and not revoked. + uid = decoded_token['uid'] + except tenant_mgt.TenantIdMismatchError: + # Token belongs to a different tenant. + pass + except auth.RevokedIdTokenError: + # Token revoked, inform the user to reauthenticate or signOut(). + pass + except auth.InvalidIdTokenError: + # Token is invalid + pass + # [END verify_id_token_and_check_revoked_tenant] + +def custom_claims_set_tenant(tenant_client, uid): + # [START set_custom_user_claims_tenant] + # Set admin privilege on the user corresponding to uid. + tenant_client.set_custom_user_claims(uid, {'admin': True}) + # The new custom claims will propagate to the user's ID token the + # next time a new one is issued. + # [END set_custom_user_claims_tenant] + +def custom_claims_verify_tenant(tenant_client, id_token): + # [START verify_custom_claims_tenant] + # Verify the ID token first. + claims = tenant_client.verify_id_token(id_token) + if claims['admin'] is True: + # Allow access to requested admin resource. + pass + # [END verify_custom_claims_tenant] + +def custom_claims_read_tenant(tenant_client, uid): + # [START read_custom_user_claims_tenant] + # Lookup the user associated with the specified uid. + user = tenant_client.get_user(uid) + + # The claims can be accessed on the user record. + print(user.custom_claims.get('admin')) + # [END read_custom_user_claims_tenant] + +def generate_email_verification_link_tenant(tenant_client): + # [START email_verification_link_tenant] + action_code_settings = auth.ActionCodeSettings( + url='https://www.example.com/checkout?cartId=1234', + handle_code_in_app=True, + ios_bundle_id='com.example.ios', + android_package_name='com.example.android', + android_install_app=True, + android_minimum_version='12', + # FDL custom domain. + dynamic_link_domain='coolapp.page.link', + ) + + email = 'user@example.com' + link = tenant_client.generate_email_verification_link(email, action_code_settings) + # Construct email from a template embedding the link, and send + # using a custom SMTP server. + send_custom_email(email, link) + # [END email_verification_link_tenant] + + initialize_sdk_with_service_account() initialize_sdk_with_application_default() #initialize_sdk_with_refresh_token()