"""FPL Northwest data collection api client""" from datetime import datetime, timedelta import logging import async_timeout import boto3 from .const import TIMEOUT, API_HOST from .aws_srp import AWSSRP from .const import LOGIN_RESULT_OK USER_POOL_ID = "us-east-1_w09KCowou" CLIENT_ID = "4k78t7970hhdgtafurk158dr3a" ACCOUNT_STATUS_ACTIVE = "ACT" _LOGGER = logging.getLogger(__package__) class FplNorthwestRegionApiClient: """FPL Northwest Api client""" def __init__(self, username, password, loop, session) -> None: self.session = session self.username = username self.password = password self.loop = loop self.id_token = None self.access_token = None self.refresh_token = None async def login(self): """login using aws""" client = await self.loop.run_in_executor( None, boto3.client, "cognito-idp", "us-east-1" ) aws = AWSSRP( username=self.username, password=self.password, pool_id=USER_POOL_ID, client_id=CLIENT_ID, loop=self.loop, client=client, ) tokens = await aws.authenticate_user() if "AccessToken" in tokens["AuthenticationResult"]: self.access_token = tokens["AuthenticationResult"]["AccessToken"] self.refresh_token = tokens["AuthenticationResult"]["RefreshToken"] self.id_token = tokens["AuthenticationResult"]["IdToken"] # Get User headers = { "Content-Type": "application/x-amz-json-1.1", "X-Amz-Target": "AWSCognitoIdentityProviderService.GetUser", } JSON = {"AccessToken": self.access_token} async with async_timeout.timeout(TIMEOUT): response = await self.session.post( "https://cognito-idp.us-east-1.amazonaws.com/", headers=headers, json=JSON, ) if response.status == 200: data = await response.json(content_type="application/x-amz-json-1.1") # InitiateAuth headers = { "Content-Type": "application/x-amz-json-1.1", "X-Amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth", } payload = { "AuthFlow": "REFRESH_TOKEN_AUTH", "AuthParameters": { "DEVICE_KEY": None, "REFRESH_TOKEN": self.refresh_token, }, "ClientId": "4k78t7970hhdgtafurk158dr3a", } async with async_timeout.timeout(TIMEOUT): response = await self.session.post( "https://cognito-idp.us-east-1.amazonaws.com/", headers=headers, json=payload, ) if response.status == 200: data = await response.json(content_type="application/x-amz-json-1.1") self.access_token = data["AuthenticationResult"]["AccessToken"] self.id_token = tokens["AuthenticationResult"]["IdToken"] return LOGIN_RESULT_OK async def get_open_accounts(self): """ Returns the open accounts """ result = [] URL = API_HOST + "/cs/gulf/ssp/v1/profile/accounts/list" headers = {"Authorization": f"Bearer {self.id_token}"} async with async_timeout.timeout(TIMEOUT): response = await self.session.get(URL, headers=headers) if response.status == 200: data = await response.json() for account in data["accounts"]: if account["accountStatus"] == ACCOUNT_STATUS_ACTIVE: result.append(account["accountNumber"]) return result async def logout(self): """log out from fpl""" async def update(self, account): """ Returns the data collected from fpl """ URL = ( API_HOST + f"/cs/gulf/ssp/v1/accountservices/account/{account}/accountSummary?balance=y" ) headers = {"Authorization": f"Bearer {self.id_token}"} async with async_timeout.timeout(TIMEOUT): response = await self.session.get(URL, headers=headers) result = {} if response.status == 200: data = await response.json() accountSumary = data["accountSummary"]["accountSummaryData"] billAndMetterInfo = accountSumary["billAndMeterInfo"] programInfo = accountSumary["programInfo"] result["budget_bill"] = False result["projected_bill"] = float(billAndMetterInfo["projBillAmount"] or 0) result["projectedKWH"] = int(billAndMetterInfo["projBillKWH"] or 0) result["bill_to_date"] = float(billAndMetterInfo["asOfDateAmount"] or 0) result["billToDateKWH"] = int(billAndMetterInfo["asOfDateUsage"] or 0) result["daily_avg"] = float(billAndMetterInfo["dailyAvgAmount"] or 0) result["dailyAverageKWH"] = int(billAndMetterInfo["dailyAvgKwh"] or 0) start = datetime.fromisoformat(programInfo["currentBillDate"]) # + timedelta(days=1) end = datetime.fromisoformat(programInfo["nextBillDate"]) today = datetime.fromisoformat(data["today"]) # result["billStartDate"] = programInfo["currentBillDate"] result["current_bill_date"] = start.strftime("%Y-%m-%d") result["next_bill_date"] = programInfo["nextBillDate"] service_days = (end - start).days as_of_days = (today - start).days result["service_days"] = service_days result["as_of_days"] = as_of_days result["remaining_days"] = service_days - as_of_days return result