Merge branch 'dev'

This commit is contained in:
Yordan Suarez
2022-08-23 09:00:49 -04:00
19 changed files with 1265 additions and 626 deletions

View File

@@ -1,4 +1,4 @@
FROM python:3.8
FROM python:3.10
RUN apt-get update \
&& apt-get install -y --no-install-recommends \

View File

@@ -1,6 +1,6 @@
// See https://aka.ms/vscode-remote/devcontainer.json for format details.
{
"image": "ghcr.io/ludeeus/devcontainer/integration:stable",
"image": "ghcr.io/ludeeus/devcontainer/integration:latest",
"name": "FPL integration development",
"context": "..",
"appPort": [

2
.gitignore vendored
View File

@@ -8,3 +8,5 @@ custom_components/fpl/__pycache__/
test.py
custom_components/fpl1/__pycache__/
custom_components/fpl/secrets.py

View File

@@ -0,0 +1,384 @@
"""FPL Main region data collection api client"""
import json
import logging
from datetime import datetime, timedelta
import aiohttp
import async_timeout
from .const import (
API_HOST,
LOGIN_RESULT_FAILURE,
LOGIN_RESULT_INVALIDPASSWORD,
LOGIN_RESULT_INVALIDUSER,
LOGIN_RESULT_OK,
TIMEOUT,
)
STATUS_CATEGORY_OPEN = "OPEN"
URL_LOGIN = API_HOST + "/api/resources/login"
URL_BUDGET_BILLING_GRAPH = (
API_HOST + "/api/resources/account/{account}/budgetBillingGraph"
)
URL_RESOURCES_PROJECTED_BILL = (
API_HOST
+ "/api/resources/account/{account}/projectedBill"
+ "?premiseNumber={premise}&lastBilledDate={lastBillDate}"
)
URL_APPLIANCE_USAGE = (
API_HOST + "/dashboard-api/resources/account/{account}/applianceUsage/{account}"
)
URL_BUDGET_BILLING_PREMISE_DETAILS = (
API_HOST + "/api/resources/account/{account}/budgetBillingGraph/premiseDetails"
)
ENROLLED = "ENROLLED"
NOTENROLLED = "NOTENROLLED"
_LOGGER = logging.getLogger(__package__)
class FplMainRegionApiClient:
"""Fpl Main Region Api Client"""
def __init__(self, username, password, loop, session) -> None:
self.session = session
self.username = username
self.password = password
self.loop = loop
async def login(self):
"""login into fpl"""
# login and get account information
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(
URL_LOGIN,
auth=aiohttp.BasicAuth(self.username, self.password),
)
if response.status == 200:
return LOGIN_RESULT_OK
if response.status == 401:
json_data = json.loads(await response.text())
if json_data["messageCode"] == LOGIN_RESULT_INVALIDUSER:
return LOGIN_RESULT_INVALIDUSER
if json_data["messageCode"] == LOGIN_RESULT_INVALIDPASSWORD:
return LOGIN_RESULT_INVALIDPASSWORD
return LOGIN_RESULT_FAILURE
async def get_open_accounts(self):
"""
Get open accounts
Returns array with active account numbers
"""
result = []
URL = API_HOST + "/api/resources/header"
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(URL)
json_data = await response.json()
accounts = json_data["data"]["accounts"]["data"]["data"]
for account in accounts:
if account["statusCategory"] == STATUS_CATEGORY_OPEN:
result.append(account["accountNumber"])
return result
async def logout(self):
"""Logging out from fpl"""
_LOGGER.info("Logging out")
URL_LOGOUT = API_HOST + "/api/resources/logout"
try:
async with async_timeout.timeout(TIMEOUT):
await self.session.get(URL_LOGOUT)
except:
pass
async def update(self, account) -> dict:
"""Get data from resources endpoint"""
data = {}
URL_RESOURCES_ACCOUNT = API_HOST + "/api/resources/account/{account}"
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(
URL_RESOURCES_ACCOUNT.format(account=account)
)
account_data = (await response.json())["data"]
premise = account_data.get("premiseNumber").zfill(9)
data["meterSerialNo"] = account_data["meterSerialNo"]
# currentBillDate
currentBillDate = datetime.strptime(
account_data["currentBillDate"].replace("-", "").split("T")[0], "%Y%m%d"
).date()
# nextBillDate
nextBillDate = datetime.strptime(
account_data["nextBillDate"].replace("-", "").split("T")[0], "%Y%m%d"
).date()
data["current_bill_date"] = str(currentBillDate)
data["next_bill_date"] = str(nextBillDate)
today = datetime.now().date()
data["service_days"] = (nextBillDate - currentBillDate).days
data["as_of_days"] = (today - currentBillDate).days
data["remaining_days"] = (nextBillDate - today).days
# zip code
# zip_code = accountData["serviceAddress"]["zip"]
# projected bill
pbData = await self.__getFromProjectedBill(account, premise, currentBillDate)
data.update(pbData)
# programs
programsData = account_data["programs"]["data"]
programs = dict()
_LOGGER.info("Getting Programs")
for program in programsData:
if "enrollmentStatus" in program.keys():
key = program["name"]
programs[key] = program["enrollmentStatus"] == ENROLLED
def hasProgram(programName) -> bool:
return programName in programs and programs[programName]
# Budget Billing program
if hasProgram("BBL"):
data["budget_bill"] = True
bbl_data = await self.__getBBL_async(account, data)
data.update(bbl_data)
else:
data["budget_bill"] = False
# Get data from energy service
data.update(
await self.__getDataFromEnergyService(account, premise, currentBillDate)
)
# Get data from energy service ( hourly )
# data.update(
# await self.__getDataFromEnergyServiceHourly(
# account, premise, currentBillDate
# )
# )
data.update(await self.__getDataFromApplianceUsage(account, currentBillDate))
return data
async def __getFromProjectedBill(self, account, premise, currentBillDate) -> dict:
"""get data from projected bill endpoint"""
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(
URL_RESOURCES_PROJECTED_BILL.format(
account=account,
premise=premise,
lastBillDate=currentBillDate.strftime("%m%d%Y"),
)
)
if response.status == 200:
projectedBillData = (await response.json())["data"]
billToDate = float(projectedBillData["billToDate"])
projectedBill = float(projectedBillData["projectedBill"])
dailyAvg = float(projectedBillData["dailyAvg"])
avgHighTemp = int(projectedBillData["avgHighTemp"])
data["bill_to_date"] = billToDate
data["projected_bill"] = projectedBill
data["daily_avg"] = dailyAvg
data["avg_high_temp"] = avgHighTemp
except:
pass
return data
async def __getBBL_async(self, account, projectedBillData) -> dict:
"""Get budget billing data"""
_LOGGER.info("Getting budget billing data")
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(
URL_BUDGET_BILLING_PREMISE_DETAILS.format(account=account)
)
if response.status == 200:
r = (await response.json())["data"]
dataList = r["graphData"]
# startIndex = len(dataList) - 1
billingCharge = 0
budgetBillDeferBalance = r["defAmt"]
projectedBill = projectedBillData["projected_bill"]
asOfDays = projectedBillData["as_of_days"]
for det in dataList:
billingCharge += det["actuallBillAmt"]
calc1 = (projectedBill + billingCharge) / 12
calc2 = (1 / 12) * (budgetBillDeferBalance)
projectedBudgetBill = round(calc1 + calc2, 2)
bbDailyAvg = round(projectedBudgetBill / 30, 2)
bbAsOfDateAmt = round(projectedBudgetBill / 30 * asOfDays, 2)
data["budget_billing_daily_avg"] = bbDailyAvg
data["budget_billing_bill_to_date"] = bbAsOfDateAmt
data["budget_billing_projected_bill"] = float(projectedBudgetBill)
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(
URL_BUDGET_BILLING_GRAPH.format(account=account)
)
if response.status == 200:
r = (await response.json())["data"]
data["bill_to_date"] = float(r["eleAmt"])
data["defered_amount"] = float(r["defAmt"])
except:
pass
return data
async def __getDataFromEnergyService(
self, account, premise, lastBilledDate
) -> dict:
_LOGGER.info("Getting data from energy service")
date = str(lastBilledDate.strftime("%m%d%Y"))
JSON = {
"recordCount": 24,
"status": 2,
"channel": "WEB",
"amrFlag": "Y",
"accountType": "RESIDENTIAL",
"revCode": "1",
"premiseNumber": premise,
"projectedBillFlag": True,
"billComparisionFlag": True,
"monthlyFlag": True,
"frequencyType": "Daily",
"lastBilledDate": date,
"applicationPage": "resDashBoard",
}
URL_ENERGY_SERVICE = (
API_HOST
+ "/dashboard-api/resources/account/{account}/energyService/{account}"
)
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self.session.post(
URL_ENERGY_SERVICE.format(account=account), json=JSON
)
if response.status == 200:
r = (await response.json())["data"]
dailyUsage = []
# totalPowerUsage = 0
if "data" in r["DailyUsage"]:
for daily in r["DailyUsage"]["data"]:
if (
"kwhUsed" in daily.keys()
and "billingCharge" in daily.keys()
and "date" in daily.keys()
and "averageHighTemperature" in daily.keys()
):
dailyUsage.append(
{
"usage": daily["kwhUsed"],
"cost": daily["billingCharge"],
# "date": daily["date"],
"max_temperature": daily[
"averageHighTemperature"
],
"netDeliveredKwh": daily["netDeliveredKwh"]
if "netDeliveredKwh" in daily.keys()
else 0,
"netReceivedKwh": daily["netReceivedKwh"]
if "netReceivedKwh" in daily.keys()
else 0,
"readTime": datetime.fromisoformat(
daily[
"readTime"
] # 2022-02-25T00:00:00.000-05:00
),
}
)
# totalPowerUsage += int(daily["kwhUsed"])
# data["total_power_usage"] = totalPowerUsage
data["daily_usage"] = dailyUsage
data["projectedKWH"] = r["CurrentUsage"]["projectedKWH"]
data["dailyAverageKWH"] = float(
r["CurrentUsage"]["dailyAverageKWH"]
)
data["billToDateKWH"] = float(r["CurrentUsage"]["billToDateKWH"])
data["recMtrReading"] = int(r["CurrentUsage"]["recMtrReading"] or 0)
data["delMtrReading"] = int(r["CurrentUsage"]["delMtrReading"] or 0)
data["billStartDate"] = r["CurrentUsage"]["billStartDate"]
except:
pass
return data
async def __getDataFromApplianceUsage(self, account, lastBilledDate) -> dict:
"""get data from appliance usage"""
_LOGGER.info("Getting data from appliance usage")
JSON = {"startDate": str(lastBilledDate.strftime("%m%d%Y"))}
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self.session.post(
URL_APPLIANCE_USAGE.format(account=account), json=JSON
)
if response.status == 200:
electric = (await response.json())["data"]["electric"]
full = 100
for e in electric:
rr = round(float(e["percentageDollar"]))
if rr < full:
full = full - rr
else:
rr = full
data[e["category"].replace(" ", "_")] = rr
except:
pass
return {"energy_percent_by_applicance": data}

View File

@@ -0,0 +1,173 @@
"""FPL Northwest data collection api client"""
from datetime import datetime, timedelta
import logging
import async_timeout
import boto3
from .const import TIMEOUT, API_HOST
from .aws_srp import AWSSRP
from .const import LOGIN_RESULT_OK
USER_POOL_ID = "us-east-1_w09KCowou"
CLIENT_ID = "4k78t7970hhdgtafurk158dr3a"
ACCOUNT_STATUS_ACTIVE = "ACT"
_LOGGER = logging.getLogger(__package__)
class FplNorthwestRegionApiClient:
"""FPL Northwest Api client"""
def __init__(self, username, password, loop, session) -> None:
self.session = session
self.username = username
self.password = password
self.loop = loop
self.id_token = None
self.access_token = None
self.refresh_token = None
async def login(self):
"""login using aws"""
client = await self.loop.run_in_executor(
None, boto3.client, "cognito-idp", "us-east-1"
)
aws = AWSSRP(
username=self.username,
password=self.password,
pool_id=USER_POOL_ID,
client_id=CLIENT_ID,
loop=self.loop,
client=client,
)
tokens = await aws.authenticate_user()
if "AccessToken" in tokens["AuthenticationResult"]:
self.access_token = tokens["AuthenticationResult"]["AccessToken"]
self.refresh_token = tokens["AuthenticationResult"]["RefreshToken"]
self.id_token = tokens["AuthenticationResult"]["IdToken"]
# Get User
headers = {
"Content-Type": "application/x-amz-json-1.1",
"X-Amz-Target": "AWSCognitoIdentityProviderService.GetUser",
}
JSON = {"AccessToken": self.access_token}
async with async_timeout.timeout(TIMEOUT):
response = await self.session.post(
"https://cognito-idp.us-east-1.amazonaws.com/",
headers=headers,
json=JSON,
)
if response.status == 200:
data = await response.json(content_type="application/x-amz-json-1.1")
# InitiateAuth
headers = {
"Content-Type": "application/x-amz-json-1.1",
"X-Amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth",
}
payload = {
"AuthFlow": "REFRESH_TOKEN_AUTH",
"AuthParameters": {
"DEVICE_KEY": None,
"REFRESH_TOKEN": self.refresh_token,
},
"ClientId": "4k78t7970hhdgtafurk158dr3a",
}
async with async_timeout.timeout(TIMEOUT):
response = await self.session.post(
"https://cognito-idp.us-east-1.amazonaws.com/",
headers=headers,
json=payload,
)
if response.status == 200:
data = await response.json(content_type="application/x-amz-json-1.1")
self.access_token = data["AuthenticationResult"]["AccessToken"]
self.id_token = tokens["AuthenticationResult"]["IdToken"]
return LOGIN_RESULT_OK
async def get_open_accounts(self):
"""
Returns the open accounts
"""
result = []
URL = API_HOST + "/cs/gulf/ssp/v1/profile/accounts/list"
headers = {"Authorization": f"Bearer {self.id_token}"}
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(URL, headers=headers)
if response.status == 200:
data = await response.json()
for account in data["accounts"]:
if account["accountStatus"] == ACCOUNT_STATUS_ACTIVE:
result.append(account["accountNumber"])
return result
async def logout(self):
"""log out from fpl"""
async def update(self, account):
"""
Returns the data collected from fpl
"""
URL = (
API_HOST
+ f"/cs/gulf/ssp/v1/accountservices/account/{account}/accountSummary?balance=y"
)
headers = {"Authorization": f"Bearer {self.id_token}"}
async with async_timeout.timeout(TIMEOUT):
response = await self.session.get(URL, headers=headers)
result = {}
if response.status == 200:
data = await response.json()
accountSumary = data["accountSummary"]["accountSummaryData"]
billAndMetterInfo = accountSumary["billAndMeterInfo"]
programInfo = accountSumary["programInfo"]
result["budget_bill"] = False
result["projected_bill"] = float(billAndMetterInfo["projBillAmount"] or 0)
result["projectedKWH"] = int(billAndMetterInfo["projBillKWH"] or 0)
result["bill_to_date"] = float(billAndMetterInfo["asOfDateAmount"] or 0)
result["billToDateKWH"] = int(billAndMetterInfo["asOfDateUsage"] or 0)
result["daily_avg"] = float(billAndMetterInfo["dailyAvgAmount"] or 0)
result["dailyAverageKWH"] = int(billAndMetterInfo["dailyAvgKwh"] or 0)
start = datetime.fromisoformat(programInfo["currentBillDate"])
# + timedelta(days=1)
end = datetime.fromisoformat(programInfo["nextBillDate"])
today = datetime.fromisoformat(data["today"])
# result["billStartDate"] = programInfo["currentBillDate"]
result["current_bill_date"] = start.strftime("%Y-%m-%d")
result["next_bill_date"] = programInfo["nextBillDate"]
service_days = (end - start).days
as_of_days = (today - start).days
result["service_days"] = service_days
result["as_of_days"] = as_of_days
result["remaining_days"] = service_days - as_of_days
return result

View File

@@ -9,16 +9,16 @@ from homeassistant.core import Config, HomeAssistant
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import Throttle
from homeassistant.const import CONF_USERNAME, CONF_PASSWORD
from .fplapi import FplApi
from .const import (
DOMAIN,
DOMAIN_DATA,
CONF_USERNAME,
CONF_PASSWORD,
PLATFORMS,
STARTUP_MESSAGE,
)
from .fplDataUpdateCoordinator import FplDataUpdateCoordinator
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=30)
@@ -63,7 +63,7 @@ async def async_setup_entry(hass, entry):
# Configure the client.
_LOGGER.info("Configuring the client")
session = async_get_clientsession(hass)
client = FplApi(username, password, session)
client = FplApi(username, password, session, hass.loop)
coordinator = FplDataUpdateCoordinator(hass, client=client)
await coordinator.async_refresh()

View File

@@ -0,0 +1,323 @@
"""AWS SRP"""
import os
import base64
import binascii
import datetime
import hashlib
import hmac
import re
import functools
import boto3
import six
from .exceptions import ForceChangePasswordException
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22
n_hex = (
"FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1"
+ "29024E088A67CC74020BBEA63B139B22514A08798E3404DD"
+ "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245"
+ "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"
+ "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D"
+ "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F"
+ "83655D23DCA3AD961C62F356208552BB9ED529077096966D"
+ "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B"
+ "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9"
+ "DE2BCBF6955817183995497CEA956AE515D2261898FA0510"
+ "15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64"
+ "ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7"
+ "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B"
+ "F12FFA06D98A0864D87602733EC86A64521F2B18177B200C"
+ "BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31"
+ "43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF"
)
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49
g_hex = "2"
info_bits = bytearray("Caldera Derived Key", "utf-8")
def hash_sha256(buf):
"""AuthenticationHelper.hash"""
a = hashlib.sha256(buf).hexdigest()
return (64 - len(a)) * "0" + a
def hex_hash(hex_string):
return hash_sha256(bytearray.fromhex(hex_string))
def hex_to_long(hex_string):
return int(hex_string, 16)
def long_to_hex(long_num):
return "%x" % long_num
def get_random(nbytes):
random_hex = binascii.hexlify(os.urandom(nbytes))
return hex_to_long(random_hex)
def pad_hex(long_int):
"""
Converts a Long integer (or hex string) to hex format padded with zeroes for hashing
:param {Long integer|String} long_int Number or string to pad.
:return {String} Padded hex string.
"""
if not isinstance(long_int, six.string_types):
hash_str = long_to_hex(long_int)
else:
hash_str = long_int
if len(hash_str) % 2 == 1:
hash_str = "0%s" % hash_str
elif hash_str[0] in "89ABCDEFabcdef":
hash_str = "00%s" % hash_str
return hash_str
def compute_hkdf(ikm, salt):
"""
Standard hkdf algorithm
:param {Buffer} ikm Input key material.
:param {Buffer} salt Salt value.
:return {Buffer} Strong key material.
@private
"""
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
info_bits_update = info_bits + bytearray(chr(1), "utf-8")
hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest()
return hmac_hash[:16]
def calculate_u(big_a, big_b):
"""
Calculate the client's value U which is the hash of A and B
:param {Long integer} big_a Large A value.
:param {Long integer} big_b Server B value.
:return {Long integer} Computed U value.
"""
u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b))
return hex_to_long(u_hex_hash)
class AWSSRP(object):
NEW_PASSWORD_REQUIRED_CHALLENGE = "NEW_PASSWORD_REQUIRED"
PASSWORD_VERIFIER_CHALLENGE = "PASSWORD_VERIFIER"
def __init__(
self,
username,
password,
pool_id,
client_id,
pool_region=None,
client=None,
client_secret=None,
loop=None,
):
if pool_region is not None and client is not None:
raise ValueError(
"pool_region and client should not both be specified "
"(region should be passed to the boto3 client instead)"
)
self.username = username
self.password = password
self.pool_id = pool_id
self.client_id = client_id
self.client_secret = client_secret
self.client = (
client if client else boto3.client("cognito-idp", region_name=pool_region)
)
self.big_n = hex_to_long(n_hex)
self.g = hex_to_long(g_hex)
self.k = hex_to_long(hex_hash("00" + n_hex + "0" + g_hex))
self.small_a_value = self.generate_random_small_a()
self.large_a_value = self.calculate_a()
self.loop = loop
def generate_random_small_a(self):
"""
helper function to generate a random big integer
:return {Long integer} a random value.
"""
random_long_int = get_random(128)
return random_long_int % self.big_n
def calculate_a(self):
"""
Calculate the client's public value A = g^a%N
with the generated random number a
:param {Long integer} a Randomly generated small A.
:return {Long integer} Computed large A.
"""
big_a = pow(self.g, self.small_a_value, self.big_n)
# safety check
if (big_a % self.big_n) == 0:
raise ValueError("Safety check for A failed")
return big_a
def get_password_authentication_key(self, username, password, server_b_value, salt):
"""
Calculates the final hkdf based on computed S value, and computed U value and the key
:param {String} username Username.
:param {String} password Password.
:param {Long integer} server_b_value Server B value.
:param {Long integer} salt Generated salt.
:return {Buffer} Computed HKDF value.
"""
u_value = calculate_u(self.large_a_value, server_b_value)
if u_value == 0:
raise ValueError("U cannot be zero.")
username_password = "%s%s:%s" % (self.pool_id.split("_")[1], username, password)
username_password_hash = hash_sha256(username_password.encode("utf-8"))
x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash))
g_mod_pow_xn = pow(self.g, x_value, self.big_n)
int_value2 = server_b_value - self.k * g_mod_pow_xn
s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n)
hkdf = compute_hkdf(
bytearray.fromhex(pad_hex(s_value)),
bytearray.fromhex(pad_hex(long_to_hex(u_value))),
)
return hkdf
def get_auth_params(self):
auth_params = {
"USERNAME": self.username,
"SRP_A": long_to_hex(self.large_a_value),
}
if self.client_secret is not None:
auth_params.update(
{
"SECRET_HASH": self.get_secret_hash(
self.username, self.client_id, self.client_secret
)
}
)
return auth_params
@staticmethod
def get_secret_hash(username, client_id, client_secret):
message = bytearray(username + client_id, "utf-8")
hmac_obj = hmac.new(bytearray(client_secret, "utf-8"), message, hashlib.sha256)
return base64.standard_b64encode(hmac_obj.digest()).decode("utf-8")
def process_challenge(self, challenge_parameters):
user_id_for_srp = challenge_parameters["USER_ID_FOR_SRP"]
salt_hex = challenge_parameters["SALT"]
srp_b_hex = challenge_parameters["SRP_B"]
secret_block_b64 = challenge_parameters["SECRET_BLOCK"]
# re strips leading zero from a day number (required by AWS Cognito)
timestamp = re.sub(
r" 0(\d) ",
r" \1 ",
datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"),
)
hkdf = self.get_password_authentication_key(
user_id_for_srp, self.password, hex_to_long(srp_b_hex), salt_hex
)
secret_block_bytes = base64.standard_b64decode(secret_block_b64)
msg = (
bytearray(self.pool_id.split("_")[1], "utf-8")
+ bytearray(user_id_for_srp, "utf-8")
+ bytearray(secret_block_bytes)
+ bytearray(timestamp, "utf-8")
)
hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
signature_string = base64.standard_b64encode(hmac_obj.digest())
response = {
"TIMESTAMP": timestamp,
"USERNAME": user_id_for_srp,
"PASSWORD_CLAIM_SECRET_BLOCK": secret_block_b64,
"PASSWORD_CLAIM_SIGNATURE": signature_string.decode("utf-8"),
}
if self.client_secret is not None:
response.update(
{
"SECRET_HASH": self.get_secret_hash(
self.username, self.client_id, self.client_secret
)
}
)
return response
async def authenticate_user(self, client=None):
"""authenticate user"""
boto_client = self.client or client
auth_params = self.get_auth_params()
response = await self.loop.run_in_executor(
None,
functools.partial(
boto_client.initiate_auth,
AuthFlow="USER_SRP_AUTH",
AuthParameters=auth_params,
ClientId=self.client_id,
),
)
if response["ChallengeName"] == self.PASSWORD_VERIFIER_CHALLENGE:
challenge_response = self.process_challenge(response["ChallengeParameters"])
tokens = await self.loop.run_in_executor(
None,
functools.partial(
boto_client.respond_to_auth_challenge,
ClientId=self.client_id,
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE,
ChallengeResponses=challenge_response,
),
)
# tokens = boto_client.respond_to_auth_challenge(
# ClientId=self.client_id,
# ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE,
# ChallengeResponses=challenge_response,
# )
if tokens.get("ChallengeName") == self.NEW_PASSWORD_REQUIRED_CHALLENGE:
raise ForceChangePasswordException(
"Change password before authenticating"
)
return tokens
else:
raise NotImplementedError(
"The %s challenge is not supported" % response["ChallengeName"]
)
def set_new_password_challenge(self, new_password, client=None):
boto_client = self.client or client
auth_params = self.get_auth_params()
response = boto_client.initiate_auth(
AuthFlow="USER_SRP_AUTH",
AuthParameters=auth_params,
ClientId=self.client_id,
)
if response["ChallengeName"] == self.PASSWORD_VERIFIER_CHALLENGE:
challenge_response = self.process_challenge(response["ChallengeParameters"])
tokens = boto_client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE,
ChallengeResponses=challenge_response,
)
if tokens["ChallengeName"] == self.NEW_PASSWORD_REQUIRED_CHALLENGE:
challenge_response = {
"USERNAME": auth_params["USERNAME"],
"NEW_PASSWORD": new_password,
}
new_password_response = boto_client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName=self.NEW_PASSWORD_REQUIRED_CHALLENGE,
Session=tokens["Session"],
ChallengeResponses=challenge_response,
)
return new_password_response
return tokens
else:
raise NotImplementedError(
"The %s challenge is not supported" % response["ChallengeName"]
)

View File

@@ -7,16 +7,27 @@ from homeassistant import config_entries
from homeassistant.helpers.aiohttp_client import async_create_clientsession
from homeassistant.core import callback
from .const import DOMAIN, CONF_USERNAME, CONF_PASSWORD, CONF_NAME
from homeassistant.const import CONF_USERNAME, CONF_PASSWORD, CONF_NAME
from .fplapi import (
from .const import (
CONF_ACCOUNTS,
CONF_TERRITORY,
DEFAULT_CONF_PASSWORD,
DEFAULT_CONF_USERNAME,
DOMAIN,
LOGIN_RESULT_OK,
LOGIN_RESULT_FAILURE,
LOGIN_RESULT_INVALIDUSER,
LOGIN_RESULT_INVALIDPASSWORD,
FplApi,
)
from .fplapi import FplApi
try:
from .secrets import DEFAULT_CONF_PASSWORD, DEFAULT_CONF_USERNAME
except:
pass
@callback
def configured_instances(hass):
@@ -38,7 +49,7 @@ class FplFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._errors = {}
async def async_step_user(
self, user_input={}
self, user_input=None
): # pylint: disable=dangerous-default-value
"""Handle a flow initialized by the user."""
self._errors = {}
@@ -55,15 +66,19 @@ class FplFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if username not in configured_instances(self.hass):
session = async_create_clientsession(self.hass)
api = FplApi(username, password, session)
api = FplApi(username, password, session, loop=self.hass.loop)
result = await api.login()
if result == LOGIN_RESULT_OK:
info = await api.get_basic_info()
accounts = await api.async_get_open_accounts()
accounts = info[CONF_ACCOUNTS]
# accounts = await api.async_get_open_accounts()
await api.logout()
user_input["accounts"] = accounts
user_input[CONF_ACCOUNTS] = accounts
user_input[CONF_TERRITORY] = info[CONF_TERRITORY]
return self.async_create_entry(title=username, data=user_input)
@@ -85,8 +100,8 @@ class FplFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
async def _show_config_form(self, user_input):
"""Show the configuration form to edit location data."""
username = ""
password = ""
username = DEFAULT_CONF_USERNAME
password = DEFAULT_CONF_PASSWORD
if user_input is not None:
if CONF_USERNAME in user_input:

View File

@@ -1,4 +1,8 @@
"""Constants for fpl."""
#
TIMEOUT = 5
API_HOST = "https://www.fpl.com"
# Base component constants
NAME = "FPL Integration"
DOMAIN = "fpl"
@@ -25,15 +29,6 @@ PLATFORMS = [SENSOR]
# Device classes
BINARY_SENSOR_DEVICE_CLASS = "connectivity"
# Configuration
CONF_BINARY_SENSOR = "binary_sensor"
CONF_SENSOR = "sensor"
CONF_SWITCH = "switch"
CONF_ENABLED = "enabled"
CONF_NAME = "name"
CONF_USERNAME = "username"
CONF_PASSWORD = "password"
# Defaults
DEFAULT_NAME = DOMAIN
@@ -47,3 +42,21 @@ If you have any issues with this you need to open an issue here:
{ISSUE_URL}
-------------------------------------------------------------------
"""
DEFAULT_CONF_USERNAME = ""
DEFAULT_CONF_PASSWORD = ""
# Api login result
LOGIN_RESULT_OK = "OK"
LOGIN_RESULT_INVALIDUSER = "NOTVALIDUSER"
LOGIN_RESULT_INVALIDPASSWORD = "FAILEDPASSWORD"
LOGIN_RESULT_UNAUTHORIZED = "UNAUTHORIZED"
LOGIN_RESULT_FAILURE = "FAILURE"
CONF_TERRITORY = "territory"
CONF_ACCOUNTS = "account"
FPL_MAINREGION = "FL01"
FPL_NORTHWEST = "FL02"

View File

@@ -0,0 +1,13 @@
"""exceptions file"""
class WarrantException(Exception):
"""Base class for all Warrant exceptions"""
class ForceChangePasswordException(WarrantException):
"""Raised when the user is forced to change their password"""
class TokenVerificationException(WarrantException):
"""Raised when token verification fails."""

View File

@@ -17,6 +17,8 @@ from .const import DOMAIN, VERSION, ATTRIBUTION
class FplEntity(CoordinatorEntity, SensorEntity):
"""FPL base entity"""
_attr_attribution = ATTRIBUTION
def __init__(self, coordinator, config_entry, account, sensorName):
super().__init__(coordinator)
self.config_entry = config_entry
@@ -26,9 +28,8 @@ class FplEntity(CoordinatorEntity, SensorEntity):
@property
def unique_id(self):
"""Return the ID of this device."""
return "{}{}{}".format(
DOMAIN, self.account, self.sensorName.lower().replace(" ", "")
)
sensorName = self.sensorName.lower().replace(" ", "")
return f"{DOMAIN}{self.account}{sensorName}"
@property
def name(self):
@@ -51,93 +52,53 @@ class FplEntity(CoordinatorEntity, SensorEntity):
def extra_state_attributes(self):
"""Return the state attributes."""
attributes = {
"attribution": ATTRIBUTION,
"integration": "FPL",
# "attribution": ATTRIBUTION,
# "integration": "FPL",
}
attributes.update(self.customAttributes())
return attributes
def getData(self, field):
"""call this method to retrieve sensor data"""
return self.coordinator.data.get(self.account).get(field, None)
if self.coordinator.data is not None:
account = self.coordinator.data.get(self.account)
if account is not None:
return account.get(field, None)
return None
class FplEnergyEntity(FplEntity):
"""Represents a energy sensor"""
@property
def state_class(self) -> str:
"""Return the state class of this entity, from STATE_CLASSES, if any."""
return STATE_CLASS_MEASUREMENT
_attr_native_unit_of_measurement = ENERGY_KILO_WATT_HOUR
# _attr_device_class = DEVICE_CLASS_ENERGY
_attr_icon = "mdi:flash"
@property
def last_reset(self) -> datetime:
def last_reset_not_use(self) -> datetime:
"""Return the time when the sensor was last reset, if any."""
today = datetime.today()
yesterday = today - timedelta(days=1)
return datetime.combine(yesterday, datetime.min.time())
@property
def device_class(self) -> str:
"""Return the class of this device, from component DEVICE_CLASSES."""
return DEVICE_CLASS_ENERGY
@property
def unit_of_measurement(self) -> str:
"""Return the unit of measurement of this entity, if any."""
return ENERGY_KILO_WATT_HOUR
@property
def icon(self):
return "mdi:flash"
class FplMoneyEntity(FplEntity):
"""Represents a money sensor"""
@property
def icon(self):
return "mdi:currency-usd"
@property
def device_class(self) -> str:
"""Return the class of this device, from component DEVICE_CLASSES."""
return DEVICE_CLASS_MONETARY
@property
def unit_of_measurement(self) -> str:
"""Return the unit of measurement of this entity, if any."""
return CURRENCY_DOLLAR
_attr_native_unit_of_measurement = CURRENCY_DOLLAR
_attr_device_class = DEVICE_CLASS_MONETARY
_attr_icon = "mdi:currency-usd"
class FplDateEntity(FplEntity):
"""Represents a date or days"""
# @property
# def device_class(self) -> str:
# """Return the class of this device, from component DEVICE_CLASSES."""
# return DEVICE_CLASS_DATE
@property
def icon(self):
return "mdi:calendar"
_attr_icon = "mdi:calendar"
class FplDayEntity(FplEntity):
"""Represents a date or days"""
# @property
# def device_class(self) -> str:
# """Return the class of this device, from component DEVICE_CLASSES."""
# return DEVICE_CLASS_DATE
@property
def icon(self):
return "mdi:calendar"
@property
def unit_of_measurement(self) -> str:
"""Return the unit of measurement of this entity, if any."""
return "days"
_attr_native_unit_of_measurement = "days"
_attr_icon = "mdi:calendar"

View File

@@ -1,478 +1,135 @@
"""Custom FPl api client"""
import logging
from datetime import datetime, timedelta
import sys
import json
import aiohttp
import logging
import async_timeout
STATUS_CATEGORY_OPEN = "OPEN"
# Api login result
LOGIN_RESULT_OK = "OK"
LOGIN_RESULT_INVALIDUSER = "NOTVALIDUSER"
LOGIN_RESULT_INVALIDPASSWORD = "FAILEDPASSWORD"
LOGIN_RESULT_UNAUTHORIZED = "UNAUTHORIZED"
LOGIN_RESULT_FAILURE = "FAILURE"
from .const import (
CONF_ACCOUNTS,
CONF_TERRITORY,
FPL_MAINREGION,
LOGIN_RESULT_FAILURE,
LOGIN_RESULT_OK,
TIMEOUT,
API_HOST,
)
from .FplMainRegionApiClient import FplMainRegionApiClient
from .FplNorthwestRegionApiClient import FplNorthwestRegionApiClient
_LOGGER = logging.getLogger(__package__)
TIMEOUT = 5
API_HOST = "https://www.fpl.com"
URL_LOGIN = API_HOST + "/api/resources/login"
URL_LOGOUT = API_HOST + "/api/resources/logout"
URL_RESOURCES_HEADER = API_HOST + "/api/resources/header"
URL_RESOURCES_ACCOUNT = API_HOST + "/api/resources/account/{account}"
URL_BUDGET_BILLING_GRAPH = (
API_HOST + "/api/resources/account/{account}/budgetBillingGraph"
)
URL_RESOURCES_PROJECTED_BILL = (
API_HOST
+ "/api/resources/account/{account}/projectedBill"
+ "?premiseNumber={premise}&lastBilledDate={lastBillDate}"
)
URL_ENERGY_SERVICE = (
API_HOST + "/dashboard-api/resources/account/{account}/energyService/{account}"
)
URL_APPLIANCE_USAGE = (
API_HOST + "/dashboard-api/resources/account/{account}/applianceUsage/{account}"
)
URL_BUDGET_BILLING_PREMISE_DETAILS = (
API_HOST + "/api/resources/account/{account}/budgetBillingGraph/premiseDetails"
)
ENROLLED = "ENROLLED"
NOTENROLLED = "NOTENROLLED"
URL_TERRITORY = API_HOST + "/cs/customer/v1/territoryid/public/territory"
class NoTerrytoryAvailableException(Exception):
"""Thrown when not possible to determine user territory"""
class FplApi:
"""A class for getting energy usage information from Florida Power & Light."""
def __init__(self, username, password, session):
def __init__(self, username, password, session, loop):
"""Initialize the data retrieval. Session should have BasicAuth flag set."""
self._username = username
self._password = password
self._session = session
self._loop = loop
self._territory = None
self.access_token = None
self.id_token = None
self.apiClient = None
async def getTerritory(self):
"""get territory"""
if self._territory is not None:
return self._territory
headers = {"userID": f"{self._username}", "channel": "WEB"}
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(URL_TERRITORY, headers=headers)
if response.status == 200:
json_data = json.loads(await response.text())
territoryArray = json_data["data"]["territory"]
if len(territoryArray) == 0:
raise NoTerrytoryAvailableException()
self._territory = territoryArray[0]
return territoryArray[0]
def isMainRegion(self):
"""Returns true if this account belongs to the main region, not northwest"""
return self._territory == FPL_MAINREGION
async def initialize(self):
"""initialize the api client"""
self._territory = await self.getTerritory()
# set the api client based on user's territory
if self.apiClient is None:
if self.isMainRegion():
self.apiClient = FplMainRegionApiClient(
self._username, self._password, self._loop, self._session
)
else:
self.apiClient = FplNorthwestRegionApiClient(
self._username, self._password, self._loop, self._session
)
async def get_basic_info(self):
"""returns basic info for sensor initialization"""
await self.initialize()
data = {}
data[CONF_TERRITORY] = self._territory
data[CONF_ACCOUNTS] = await self.apiClient.get_open_accounts()
return data
async def async_get_data(self) -> dict:
"""Get data from fpl api"""
await self.initialize()
data = {}
data["accounts"] = []
if await self.login() == LOGIN_RESULT_OK:
accounts = await self.async_get_open_accounts()
data[CONF_ACCOUNTS] = []
data["accounts"] = accounts
data[CONF_TERRITORY] = self._territory
login_result = await self.apiClient.login()
if login_result == LOGIN_RESULT_OK:
accounts = await self.apiClient.get_open_accounts()
data[CONF_ACCOUNTS] = accounts
for account in accounts:
account_data = await self.__async_get_data(account)
data[account] = account_data
data[account] = await self.apiClient.update(account)
await self.logout()
await self.apiClient.logout()
return data
async def login(self):
"""login into fpl"""
_LOGGER.info("Logging in")
# login and get account information
"""method to use in config flow"""
try:
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(
URL_LOGIN, auth=aiohttp.BasicAuth(self._username, self._password)
)
await self.initialize()
if response.status == 200:
_LOGGER.info("Logging Successful")
return LOGIN_RESULT_OK
_LOGGER.info("Logging in")
# login and get account information
if response.status == 401:
_LOGGER.error("Logging Unauthorized")
json_data = json.loads(await response.text())
if json_data["messageCode"] == LOGIN_RESULT_INVALIDUSER:
return LOGIN_RESULT_INVALIDUSER
if json_data["messageCode"] == LOGIN_RESULT_INVALIDPASSWORD:
return LOGIN_RESULT_INVALIDPASSWORD
return await self.apiClient.login()
except Exception as exception:
_LOGGER.error("Error %s : %s", exception, sys.exc_info()[0])
return LOGIN_RESULT_FAILURE
return LOGIN_RESULT_FAILURE
async def async_get_open_accounts(self):
"""return open accounts"""
self.initialize()
return await self.apiClient.get_open_accounts()
async def logout(self):
"""Logging out from fpl"""
_LOGGER.info("Logging out")
try:
async with async_timeout.timeout(TIMEOUT):
await self._session.get(URL_LOGOUT)
except Exception:
pass
async def async_get_open_accounts(self):
"""Getting open accounts"""
_LOGGER.info("Getting open accounts")
result = []
try:
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(URL_RESOURCES_HEADER)
json_data = await response.json()
accounts = json_data["data"]["accounts"]["data"]["data"]
for account in accounts:
if account["statusCategory"] == STATUS_CATEGORY_OPEN:
result.append(account["accountNumber"])
except Exception:
_LOGGER.error("Getting accounts %s", sys.exc_info())
return result
async def __async_get_data(self, account) -> dict:
"""Get data from resources endpoint"""
_LOGGER.info("Getting Data")
data = {}
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(
URL_RESOURCES_ACCOUNT.format(account=account)
)
account_data = (await response.json())["data"]
premise = account_data.get("premiseNumber").zfill(9)
data["meterSerialNo"] = account_data["meterSerialNo"]
# currentBillDate
currentBillDate = datetime.strptime(
account_data["currentBillDate"].replace("-", "").split("T")[0], "%Y%m%d"
).date()
# nextBillDate
nextBillDate = datetime.strptime(
account_data["nextBillDate"].replace("-", "").split("T")[0], "%Y%m%d"
).date()
data["current_bill_date"] = str(currentBillDate)
data["next_bill_date"] = str(nextBillDate)
today = datetime.now().date()
data["service_days"] = (nextBillDate - currentBillDate).days
data["as_of_days"] = (today - currentBillDate).days
data["remaining_days"] = (nextBillDate - today).days
# zip code
# zip_code = accountData["serviceAddress"]["zip"]
# projected bill
pbData = await self.__getFromProjectedBill(account, premise, currentBillDate)
data.update(pbData)
# programs
programsData = account_data["programs"]["data"]
programs = dict()
_LOGGER.info("Getting Programs")
for program in programsData:
if "enrollmentStatus" in program.keys():
key = program["name"]
programs[key] = program["enrollmentStatus"] == ENROLLED
def hasProgram(programName) -> bool:
return programName in programs and programs[programName]
# Budget Billing program
if hasProgram("BBL"):
data["budget_bill"] = True
bbl_data = await self.__getBBL_async(account, data)
data.update(bbl_data)
else:
data["budget_bill"] = False
# Get data from energy service
data.update(
await self.__getDataFromEnergyService(account, premise, currentBillDate)
)
# Get data from energy service ( hourly )
# data.update(
# await self.__getDataFromEnergyServiceHourly(
# account, premise, currentBillDate
# )
# )
data.update(await self.__getDataFromApplianceUsage(account, currentBillDate))
return data
async def __getFromProjectedBill(self, account, premise, currentBillDate) -> dict:
"""get data from projected bill endpoint"""
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(
URL_RESOURCES_PROJECTED_BILL.format(
account=account,
premise=premise,
lastBillDate=currentBillDate.strftime("%m%d%Y"),
)
)
if response.status == 200:
projectedBillData = (await response.json())["data"]
billToDate = float(projectedBillData["billToDate"])
projectedBill = float(projectedBillData["projectedBill"])
dailyAvg = float(projectedBillData["dailyAvg"])
avgHighTemp = int(projectedBillData["avgHighTemp"])
data["bill_to_date"] = billToDate
data["projected_bill"] = projectedBill
data["daily_avg"] = dailyAvg
data["avg_high_temp"] = avgHighTemp
except Exception:
pass
return data
async def __getBBL_async(self, account, projectedBillData) -> dict:
"""Get budget billing data"""
_LOGGER.info("Getting budget billing data")
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(
URL_BUDGET_BILLING_PREMISE_DETAILS.format(account=account)
)
if response.status == 200:
r = (await response.json())["data"]
dataList = r["graphData"]
# startIndex = len(dataList) - 1
billingCharge = 0
budgetBillDeferBalance = r["defAmt"]
projectedBill = projectedBillData["projected_bill"]
asOfDays = projectedBillData["as_of_days"]
for det in dataList:
billingCharge += det["actuallBillAmt"]
calc1 = (projectedBill + billingCharge) / 12
calc2 = (1 / 12) * (budgetBillDeferBalance)
projectedBudgetBill = round(calc1 + calc2, 2)
bbDailyAvg = round(projectedBudgetBill / 30, 2)
bbAsOfDateAmt = round(projectedBudgetBill / 30 * asOfDays, 2)
data["budget_billing_daily_avg"] = bbDailyAvg
data["budget_billing_bill_to_date"] = bbAsOfDateAmt
data["budget_billing_projected_bill"] = float(projectedBudgetBill)
except Exception as e:
_LOGGER.error("Error getting BBL: %s", e)
try:
async with async_timeout.timeout(TIMEOUT):
response = await self._session.get(
URL_BUDGET_BILLING_GRAPH.format(account=account)
)
if response.status == 200:
r = (await response.json())["data"]
data["bill_to_date"] = float(r["eleAmt"])
data["defered_amount"] = float(r["defAmt"])
except Exception as e:
_LOGGER.error("Error getting BBL: %s", e)
return data
async def __getDataFromEnergyService(
self, account, premise, lastBilledDate
) -> dict:
_LOGGER.info("Getting data from energy service")
date = str(lastBilledDate.strftime("%m%d%Y"))
JSON = {
"recordCount": 24,
"status": 2,
"channel": "WEB",
"amrFlag": "Y",
"accountType": "RESIDENTIAL",
"revCode": "1",
"premiseNumber": premise,
"projectedBillFlag": True,
"billComparisionFlag": True,
"monthlyFlag": True,
"frequencyType": "Daily",
"lastBilledDate": date,
"applicationPage": "resDashBoard",
}
data = {}
async with async_timeout.timeout(TIMEOUT):
response = await self._session.post(
URL_ENERGY_SERVICE.format(account=account), json=JSON
)
if response.status == 200:
r = (await response.json())["data"]
dailyUsage = []
# totalPowerUsage = 0
if "data" in r["DailyUsage"]:
for daily in r["DailyUsage"]["data"]:
if (
"kwhUsed" in daily.keys()
and "billingCharge" in daily.keys()
and "date" in daily.keys()
and "averageHighTemperature" in daily.keys()
):
dailyUsage.append(
{
"usage": daily["kwhUsed"],
"cost": daily["billingCharge"],
# "date": daily["date"],
"max_temperature": daily["averageHighTemperature"],
"netDeliveredKwh": daily["netDeliveredKwh"]
if "netDeliveredKwh" in daily.keys()
else 0,
"netReceivedKwh": daily["netReceivedKwh"]
if "netReceivedKwh" in daily.keys()
else 0,
"readTime": datetime.fromisoformat(
daily[
"readTime"
] # 2022-02-25T00:00:00.000-05:00
),
}
)
# totalPowerUsage += int(daily["kwhUsed"])
# data["total_power_usage"] = totalPowerUsage
data["daily_usage"] = dailyUsage
data["projectedKWH"] = r["CurrentUsage"]["projectedKWH"]
data["dailyAverageKWH"] = r["CurrentUsage"]["dailyAverageKWH"]
data["billToDateKWH"] = r["CurrentUsage"]["billToDateKWH"]
data["recMtrReading"] = r["CurrentUsage"]["recMtrReading"]
data["delMtrReading"] = r["CurrentUsage"]["delMtrReading"]
data["billStartDate"] = r["CurrentUsage"]["billStartDate"]
return data
async def __getDataFromEnergyServiceHourly(
self, account, premise, lastBilledDate
) -> dict:
_LOGGER.info("Getting data from energy service Hourly")
# date = str(lastBilledDate.strftime("%m%d%Y"))
date = str((datetime.now() - timedelta(days=1)).strftime("%m%d%Y"))
JSON = {
"status": 2,
"channel": "WEB",
"amrFlag": "Y",
"accountType": "RESIDENTIAL",
"revCode": "1",
"premiseNumber": premise,
"projectedBillFlag": False,
"billComparisionFlag": False,
"monthlyFlag": False,
"frequencyType": "Hourly",
"applicationPage": "resDashBoard",
"startDate": date,
}
data = {}
# now = homeassistant.util.dt.utcnow()
# now = datetime.now().astimezone()
# hour = now.hour
async with async_timeout.timeout(TIMEOUT):
response = await self._session.post(
URL_ENERGY_SERVICE.format(account=account), json=JSON
)
if response.status == 200:
r = (await response.json())["data"]
dailyUsage = []
# totalPowerUsage = 0
if "data" in r["HourlyUsage"]:
for daily in r["HourlyUsage"]["data"]:
if (
"kwhUsed" in daily.keys()
and "billingCharge" in daily.keys()
and "date" in daily.keys()
and "averageHighTemperature" in daily.keys()
):
dailyUsage.append(
{
"usage": daily["kwhUsed"],
"cost": daily["billingCharge"],
# "date": daily["date"],
"max_temperature": daily["averageHighTemperature"],
"netDeliveredKwh": daily["netDeliveredKwh"]
if "netDeliveredKwh" in daily.keys()
else 0,
"netReceivedKwh": daily["netReceivedKwh"]
if "netReceivedKwh" in daily.keys()
else 0,
"readTime": datetime.fromisoformat(
daily[
"readTime"
] # 2022-02-25T00:00:00.000-05:00
),
}
)
# totalPowerUsage += int(daily["kwhUsed"])
# data["total_power_usage"] = totalPowerUsage
data["daily_usage"] = dailyUsage
data["projectedKWH"] = r["HourlyUsage"]["projectedKWH"]
data["dailyAverageKWH"] = r["HourlyUsage"]["dailyAverageKWH"]
data["billToDateKWH"] = r["HourlyUsage"]["billToDateKWH"]
data["recMtrReading"] = r["HourlyUsage"]["recMtrReading"]
data["delMtrReading"] = r["HourlyUsage"]["delMtrReading"]
data["billStartDate"] = r["HourlyUsage"]["billStartDate"]
return data
async def __getDataFromApplianceUsage(self, account, lastBilledDate) -> dict:
"""get data from appliance usage"""
_LOGGER.info("Getting data from appliance usage")
JSON = {"startDate": str(lastBilledDate.strftime("%m%d%Y"))}
data = {}
try:
async with async_timeout.timeout(TIMEOUT):
response = await self._session.post(
URL_APPLIANCE_USAGE.format(account=account), json=JSON
)
if response.status == 200:
electric = (await response.json())["data"]["electric"]
full = 100
for e in electric:
rr = round(float(e["percentageDollar"]))
if rr < full:
full = full - rr
else:
rr = full
data[e["category"].replace(" ", "_")] = rr
except Exception:
pass
return {"energy_percent_by_applicance": data}
"""log out from fpl"""
return await self.apiClient.logout()

View File

@@ -15,6 +15,7 @@ from .sensor_DatesSensor import (
RemainingDaysSensor,
)
from .sensor_ProjectedBillSensor import (
BillToDateSensor,
FplProjectedBillSensor,
ProjectedBudgetBillSensor,
ProjectedActualBillSensor,
@@ -31,52 +32,70 @@ from .sensor_DailyUsageSensor import (
FplDailyDeliveredKWHSensor,
FplDailyReceivedKWHSensor,
)
from .const import DOMAIN
# from .TestSensor import TestSensor
from .const import CONF_ACCOUNTS, CONF_TERRITORY, DOMAIN, FPL_MAINREGION, FPL_NORTHWEST
ALL_REGIONS = [FPL_MAINREGION, FPL_NORTHWEST]
ONLY_MAINREGION = [FPL_MAINREGION]
sensors = {}
def registerSensor(sensor, regions):
"""register all available sensors"""
for region in regions:
if region in sensors:
sensors[region].append(sensor)
else:
sensors[region] = [sensor]
# bill sensors
registerSensor(FplProjectedBillSensor, ALL_REGIONS)
registerSensor(BillToDateSensor, ALL_REGIONS)
# budget billing
registerSensor(ProjectedBudgetBillSensor, ONLY_MAINREGION)
registerSensor(ProjectedActualBillSensor, ONLY_MAINREGION)
registerSensor(DeferedAmountSensor, ONLY_MAINREGION)
# usage sensors
registerSensor(DailyAverageSensor, ONLY_MAINREGION)
registerSensor(BudgetDailyAverageSensor, ONLY_MAINREGION)
registerSensor(ActualDailyAverageSensor, ONLY_MAINREGION)
registerSensor(FplDailyUsageSensor, ONLY_MAINREGION)
registerSensor(FplDailyUsageKWHSensor, ONLY_MAINREGION)
# date sensors
registerSensor(CurrentBillDateSensor, ALL_REGIONS)
registerSensor(NextBillDateSensor, ONLY_MAINREGION)
registerSensor(ServiceDaysSensor, ALL_REGIONS)
registerSensor(AsOfDaysSensor, ALL_REGIONS)
registerSensor(RemainingDaysSensor, ALL_REGIONS)
# KWH sensors
registerSensor(ProjectedKWHSensor, ALL_REGIONS)
registerSensor(DailyAverageKWHSensor, ONLY_MAINREGION)
registerSensor(BillToDateKWHSensor, ALL_REGIONS)
registerSensor(NetReceivedKWHSensor, ONLY_MAINREGION)
registerSensor(NetDeliveredKWHSensor, ONLY_MAINREGION)
registerSensor(FplDailyReceivedKWHSensor, ONLY_MAINREGION)
registerSensor(FplDailyDeliveredKWHSensor, ONLY_MAINREGION)
async def async_setup_entry(hass, entry, async_add_devices):
"""Setup sensor platform."""
accounts = entry.data.get("accounts")
accounts = entry.data.get(CONF_ACCOUNTS)
territory = entry.data.get(CONF_TERRITORY)
coordinator = hass.data[DOMAIN][entry.entry_id]
fpl_accounts = []
for account in accounts:
# Test Sensor
# fpl_accounts.append(TestSensor(coordinator, entry, account))
# bill sensors
fpl_accounts.append(FplProjectedBillSensor(coordinator, entry, account))
fpl_accounts.append(ProjectedBudgetBillSensor(coordinator, entry, account))
fpl_accounts.append(ProjectedActualBillSensor(coordinator, entry, account))
fpl_accounts.append(DeferedAmountSensor(coordinator, entry, account))
# usage sensors
fpl_accounts.append(DailyAverageSensor(coordinator, entry, account))
fpl_accounts.append(BudgetDailyAverageSensor(coordinator, entry, account))
fpl_accounts.append(ActualDailyAverageSensor(coordinator, entry, account))
fpl_accounts.append(FplDailyUsageSensor(coordinator, entry, account))
fpl_accounts.append(FplDailyUsageKWHSensor(coordinator, entry, account))
# date sensors
fpl_accounts.append(CurrentBillDateSensor(coordinator, entry, account))
fpl_accounts.append(NextBillDateSensor(coordinator, entry, account))
fpl_accounts.append(ServiceDaysSensor(coordinator, entry, account))
fpl_accounts.append(AsOfDaysSensor(coordinator, entry, account))
fpl_accounts.append(RemainingDaysSensor(coordinator, entry, account))
# KWH sensors
fpl_accounts.append(ProjectedKWHSensor(coordinator, entry, account))
fpl_accounts.append(DailyAverageKWHSensor(coordinator, entry, account))
fpl_accounts.append(BillToDateKWHSensor(coordinator, entry, account))
fpl_accounts.append(NetReceivedKWHSensor(coordinator, entry, account))
fpl_accounts.append(NetDeliveredKWHSensor(coordinator, entry, account))
fpl_accounts.append(FplDailyReceivedKWHSensor(coordinator, entry, account))
fpl_accounts.append(FplDailyDeliveredKWHSensor(coordinator, entry, account))
for sensor in sensors[territory]:
fpl_accounts.append(sensor(coordinator, entry, account))
async_add_devices(fpl_accounts)

View File

@@ -10,7 +10,7 @@ class DailyAverageSensor(FplMoneyEntity):
super().__init__(coordinator, config, account, "Daily Average")
@property
def state(self):
def native_value(self):
budget = self.getData("budget_bill")
budget_billing_projected_bill = self.getData("budget_billing_daily_avg")
@@ -22,7 +22,7 @@ class DailyAverageSensor(FplMoneyEntity):
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
# attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
@@ -33,13 +33,13 @@ class BudgetDailyAverageSensor(FplMoneyEntity):
super().__init__(coordinator, config, account, "Budget Daily Average")
@property
def state(self):
def native_value(self):
return self.getData("budget_billing_daily_avg")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
# attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
@@ -50,11 +50,11 @@ class ActualDailyAverageSensor(FplMoneyEntity):
super().__init__(coordinator, config, account, "Actual Daily Average")
@property
def state(self):
def native_value(self):
return self.getData("daily_avg")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
# attributes["state_class"] = STATE_CLASS_TOTAL
return attributes

View File

@@ -1,6 +1,10 @@
"""Daily Usage Sensors"""
from homeassistant.components.sensor import STATE_CLASS_TOTAL_INCREASING
from datetime import timedelta
from datetime import timedelta, datetime
from homeassistant.components.sensor import (
STATE_CLASS_TOTAL_INCREASING,
STATE_CLASS_TOTAL,
DEVICE_CLASS_ENERGY,
)
from .fplEntity import FplEnergyEntity, FplMoneyEntity
@@ -11,7 +15,7 @@ class FplDailyUsageSensor(FplMoneyEntity):
super().__init__(coordinator, config, account, "Daily Usage")
@property
def state(self):
def native_value(self):
data = self.getData("daily_usage")
if data is not None and len(data) > 0 and "cost" in data[-1].keys():
@@ -36,8 +40,11 @@ class FplDailyUsageKWHSensor(FplEnergyEntity):
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Daily Usage KWH")
_attr_state_class = STATE_CLASS_TOTAL_INCREASING
_attr_device_class = DEVICE_CLASS_ENERGY
@property
def state(self):
def native_value(self):
data = self.getData("daily_usage")
if data is not None and len(data) > 0 and "usage" in data[-1].keys():
@@ -45,6 +52,13 @@ class FplDailyUsageKWHSensor(FplEnergyEntity):
return None
@property
def last_reset(self) -> datetime | None:
data = self.getData("daily_usage")
date = data[-1]["readTime"]
last_reset = date - timedelta(days=1)
return last_reset
def customAttributes(self):
"""Return the state attributes."""
data = self.getData("daily_usage")
@@ -64,8 +78,10 @@ class FplDailyReceivedKWHSensor(FplEnergyEntity):
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Daily Received KWH")
# _attr_state_class = STATE_CLASS_TOTAL_INCREASING
@property
def state(self):
def native_value(self):
data = self.getData("daily_usage")
if data is not None and len(data) > 0 and "netReceivedKwh" in data[-1].keys():
return data[-1]["netReceivedKwh"]
@@ -78,20 +94,21 @@ class FplDailyReceivedKWHSensor(FplEnergyEntity):
last_reset = date - timedelta(days=1)
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
attributes["date"] = date
attributes["last_reset"] = last_reset
# attributes["last_reset"] = last_reset
return attributes
class FplDailyDeliveredKWHSensor(FplEnergyEntity):
"""daily delivered Kwh sensor"""
# _attr_state_class = STATE_CLASS_TOTAL_INCREASING
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Daily Delivered KWH")
@property
def state(self):
def native_value(self):
data = self.getData("daily_usage")
if data is not None and len(data) > 0 and "netDeliveredKwh" in data[-1].keys():
return data[-1]["netDeliveredKwh"]
@@ -104,7 +121,6 @@ class FplDailyDeliveredKWHSensor(FplEnergyEntity):
last_reset = date - timedelta(days=1)
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
attributes["date"] = date
attributes["last_reset"] = last_reset
# attributes["last_reset"] = last_reset
return attributes

View File

@@ -4,45 +4,55 @@ from .fplEntity import FplDateEntity, FplDayEntity
class CurrentBillDateSensor(FplDateEntity):
"""Current bill date sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Current Bill Date")
@property
def state(self):
def native_value(self):
return datetime.date.fromisoformat(self.getData("current_bill_date"))
class NextBillDateSensor(FplDateEntity):
"""Next bill date sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Next Bill Date")
@property
def state(self):
def native_value(self):
return datetime.date.fromisoformat(self.getData("next_bill_date"))
class ServiceDaysSensor(FplDayEntity):
"""Service days sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Service Days")
@property
def state(self):
def native_value(self):
return self.getData("service_days")
class AsOfDaysSensor(FplDayEntity):
"""As of days sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "As Of Days")
@property
def state(self):
def native_value(self):
return self.getData("as_of_days")
class RemainingDaysSensor(FplDayEntity):
"""Remaining days sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Remaining Days")
@property
def state(self):
def native_value(self):
return self.getData("remaining_days")

View File

@@ -8,41 +8,47 @@ from .fplEntity import FplEnergyEntity
class ProjectedKWHSensor(FplEnergyEntity):
"""Projected KWH sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Projected KWH")
@property
def state(self):
def native_value(self):
return self.getData("projectedKWH")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
# attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
class DailyAverageKWHSensor(FplEnergyEntity):
"""Daily Average KWH sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Daily Average KWH")
@property
def state(self):
def native_value(self):
return self.getData("dailyAverageKWH")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
# attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
class BillToDateKWHSensor(FplEnergyEntity):
"""Bill To Date KWH sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Bill To Date KWH")
@property
def state(self):
def native_value(self):
return self.getData("billToDateKWH")
def customAttributes(self):
@@ -54,36 +60,40 @@ class BillToDateKWHSensor(FplEnergyEntity):
last_reset = date.today() - timedelta(days=asOfDays)
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
attributes["last_reset"] = last_reset
# attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
# attributes["last_reset"] = last_reset
return attributes
class NetReceivedKWHSensor(FplEnergyEntity):
"""Received Meter Reading KWH sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Received Meter Reading KWH")
@property
def state(self):
def native_value(self):
return self.getData("recMtrReading")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
# attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
return attributes
class NetDeliveredKWHSensor(FplEnergyEntity):
"""Delivered Meter Reading KWH sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Delivered Meter Reading KWH")
@property
def state(self):
def native_value(self):
return self.getData("delMtrReading")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
# attributes["state_class"] = STATE_CLASS_TOTAL_INCREASING
return attributes

View File

@@ -7,7 +7,9 @@ from .fplEntity import FplMoneyEntity
class FplProjectedBillSensor(FplMoneyEntity):
"""projected bill sensor"""
"""Projected bill sensor"""
# _attr_state_class = STATE_CLASS_TOTAL
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Projected Bill")
@@ -22,22 +24,9 @@ class FplProjectedBillSensor(FplMoneyEntity):
return self.getData("projected_bill")
"""
@property
def state(self):
budget = self.getData("budget_bill")
budget_billing_projected_bill = self.getData("budget_billing_projected_bill")
if budget and budget_billing_projected_bill is not None:
return self.getData("budget_billing_projected_bill")
return self.getData("projected_bill")
"""
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
attributes["budget_bill"] = self.getData("budget_bill")
return attributes
@@ -46,51 +35,55 @@ class FplProjectedBillSensor(FplMoneyEntity):
class DeferedAmountSensor(FplMoneyEntity):
"""Defered amount sensor"""
# _attr_state_class = STATE_CLASS_TOTAL
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Defered Amount")
@property
def state(self):
def native_value(self):
if self.getData("budget_bill"):
return self.getData("defered_amount")
return 0
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
class ProjectedBudgetBillSensor(FplMoneyEntity):
"""projected budget bill sensor"""
# _attr_state_class = STATE_CLASS_TOTAL
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Projected Budget Bill")
@property
def state(self):
def native_value(self):
return self.getData("budget_billing_projected_bill")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
class ProjectedActualBillSensor(FplMoneyEntity):
"""projeted actual bill sensor"""
# _attr_state_class = STATE_CLASS_TOTAL
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Projected Actual Bill")
@property
def state(self):
def native_value(self):
return self.getData("projected_bill")
def customAttributes(self):
"""Return the state attributes."""
attributes = {}
attributes["state_class"] = STATE_CLASS_TOTAL
return attributes
class BillToDateSensor(FplMoneyEntity):
"""projeted actual bill sensor"""
# _attr_state_class = STATE_CLASS_TOTAL
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Bill To Date")
@property
def native_value(self):
if self.getData("budget_bill"):
return self.getData("budget_billing_bill_to_date")
return self.getData("bill_to_date")

View File

@@ -0,0 +1,50 @@
"""Test Sensors"""
from datetime import timedelta, datetime
from homeassistant.components.sensor import (
STATE_CLASS_TOTAL_INCREASING,
DEVICE_CLASS_ENERGY,
)
from homeassistant.core import callback
from homeassistant.const import STATE_UNKNOWN
from .fplEntity import FplEnergyEntity
class TestSensor(FplEnergyEntity):
"""Daily Usage Kwh Sensor"""
def __init__(self, coordinator, config, account):
super().__init__(coordinator, config, account, "Test Sensor")
_attr_state_class = STATE_CLASS_TOTAL_INCREASING
_attr_device_class = DEVICE_CLASS_ENERGY
@property
def native_value(self):
data = self.getData("daily_usage")
if data is not None and len(data) > 0 and "usage" in data[-1].keys():
return data[-1]["usage"]
return STATE_UNKNOWN
@property
def last_reset(self) -> datetime | None:
last_reset = None
data = self.getData("daily_usage")
if len(data) > 0 and "readTime" in data[-1]:
date = data[-1]["readTime"]
last_reset = datetime.combine(date, datetime.min.time())
print(f"setting last reset {last_reset}")
return last_reset
def customAttributes(self):
"""Return the state attributes."""
print("setting custom attributes")
data = self.getData("daily_usage")
date = data[-1]["readTime"]
attributes = {}
attributes["date"] = date
last_reset = date - timedelta(days=1)
# attributes["last_reset"] = last_reset
return attributes