fastapi Server / mqtt client / pyrc - PWA

Python fastapi Server with mqtt client to receive data from MT681 device via tasmota SmartMeterReader and FritzBox SmartMeter devices (py_hab) #

This fastapi server enables access to a sqlite DB (sqlalchemy) with SmartMeter data from an mqtt broker and the SmartMeter data from a FritzBox. The database therefore contains the measurement data and devices and this data is made available via pydantic and fastapi. In addition, the fastapi server starts a scheduler (apscheduler) in a background process - the SmartMeter data of the tasmota SmartMeter from bitShake with connection to the electricity meter and also the SmartMeter devices of the FritzBox are read out via background processes using an mqtt client. This read data is periodically written to the sqlite DB and is then available again via the fastapi server. In addition, a reorganization job is started that deletes redundant data and performs data aggregation or normalization. To ensure that the scheduler and job information is available to all work processes of the fastapi server, an rpyc server and client are also implemented locally, i.e. the background job starts the scheduler and the rpyc server and makes this data available locally to the fastapi processes.

Installation - clone and pip #

Please get standard installation of python on your system and install this server as follows:

git clone https://github.com/jankstar/py_hab.git
cd py_hab
python3 -m venv env
pip install -r requirements.txt 
./start.sh

Environtment - .env for defines.py #

We use ‘.env’ for defines:

FRITZBOX_URL = "http://xxx.xxx.xxx.xxx"
USERNAME = "*******"
PASSWORD = "*******"
DATABASE = 'sqlite:///datenbank.db'
MQTT_SERVER = "xxx.xxx.xxx.xxx"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "*******"
SLEEP_TIME = 600
WAIT_TIME = 10
TIMEZONE = 'Europe/Berlin'

FRONTEND_URL= 'http://xxx.xxx.xxx.xxx:8000'
RPYX_PORT=8001

# -- SESSION --
# Secret phrase for session encryption
SESSION_SECRET=xxxxxxxxxxx

This file will be used in define.py:

FRITZBOX_URL = os.getenv('FRITZBOX_URL',"http://0.0.0.0")
USERNAME = os.getenv('USERNAME', "") 
PASSWORD = os.getenv('PASSWORD', "")
DATABASE = os.getenv('DATABASE','sqlite:///datenbank.db')
MQTT_SERVER = os.getenv('MQTT_SERVER',"0.0.0.0")
MQTT_PORT = int(os.getenv('MQTT_PORT',1883)) #1883
MQTT_CLIENT_ID = os.getenv('MQTT_CLIENT_ID', "TEST") 
SLEEP_TIME = int(os.getenv('SLEEP_TIME',600)) #600
WAIT_TIME = int(os.getenv('WAIT_TIME',10)) #10
TIMEZONE = os.getenv('TIMEZONE', 'Europe/Berlin')
SESSION_SECRET = os.getenv('SESSION_SECRET', 'secret')
FRONTEND_URL = os.getenv('FRONTEND_URL',"http://localhost:8000")
RPYX_PORT = int(os.getenv('RPYX_PORT',8001))
DEFAULT_LIMIT = 200
MODE = 'NO_DEBUG' #'DEBUG'

REST api Server - fastapi and pydantic #

The Model.py defines the structure for the API and automatically converts from the DB structure.

class MeasurementData(Base):
    '''Measurement as sqlalchemy model'''
    __table__ = Table('measurement_data', Base.metadata,
        Column('id', String, primary_key=True), # key id, we use UUID
        Column('device', String),      # device
        Column('topic', String),       # topic of mqtt
        Column('entity', String),      # sensor
        Column('sub_entity', String),  # sensor sub device
        Column('time', String),        # timestamp of measurement
        Column('amount', Float),       # value of measurement
    )

    __table_args__ = (
            Index('idx_01', 'device', 'sub_entity', 'time'),
        )
    
    def __repr__(self):
        return f"<measurement(topic='{self.topic}', time={self.time}, device={self.device}, amount={self.amount}, )>"

class Measurement(BaseModel):
    '''Measurement as pydantic model'''
    id: str
    device: str
    topic: str
    entity: str
    sub_entity: str
    time: str
    amount: float

    model_config = ConfigDict(from_attributes=True)

class DeviceData(Base):
    '''Device as sqlalchemy model'''
    __table__ = Table('device_data', Base.metadata,
        Column('device', String, primary_key=True),     # device
        Column('sub_entity', String, primary_key=True), # sensor sub device
        Column('device_name', String),
        Column('product_name', String),
        Column('manufacturer', String),
        Column('firmware_version', String),
        Column('unit', String),               # unit of measurement
        Column('cumulative', Boolean),        # if measurement is cumulative
    )

    def __repr__(self):
        return f"<device(device={self.device}, device_name={self.device_name}, unit={self.unit})>"

class Device(BaseModel):
    '''Device as pydantic model'''
    device: str
    sub_entity: str
    device_name: str
    product_name: str
    manufacturer: str
    firmware_version: str
    unit: str
    cumulative: bool

    model_config = ConfigDict(from_attributes=True)    

There is an indicator for distinguishing cumulative measurements, e.g. power consumption, i.e. this value increases with each measurement. In contrast, the power or temperature is the value at the exact time of the measurement. So we have an DB structure and an pydantic structure for the fastapi api json response.

Database and sqlalchemy #

We use sqlite db - so we can only run the sync driver and sync session make.

def get_db_session_sync():
    '''get_db_session    start and get database Session'''
    return sessionmaker(bind=get_db_sync())

def get_db_sync():
    '''get_db_sync    start and get database connection'''
    return create_engine(DATABASE,
                         pool_size=20,
                         max_overflow=30)

The .env define DATABASE = os.getenv('DATABASE','sqlite:///datenbank.db'). The function get_db_session_sync() is used as db: Session = Depends(get_db_session_sync) at endpint-function. In the fastapi endpoint we execute sql to grep data and move it to response_model.

@router.get("/api/mesurement/{device}/{sub_entity}", response_model=list[Measurement])
async def get_measurement_by_device_sub_entity(device: str, sub_entity: str,limit: int = DEFAULT_LIMIT, offset: int = 0, db: Session = Depends(get_db_session_sync)):
    '''get_measurement get measurement data by device and sub_entity from database'''

    try:
        session = db()
        last_data = session.scalars(select(MeasurementData)
                                    .where(and_(MeasurementData.device == device, MeasurementData.sub_entity == sub_entity))
                                    .order_by(MeasurementData.time.desc())
                                    .offset(offset)
                                    .limit(limit)).all()
        session.close()
        result = [Measurement.model_validate(data) for data in last_data]
    except Exception as e:
        logger.error("Error while getting mesurement by device and sub_entity: " + str(e))
        raise HTTPException(status_code=404, detail="No Items found")

    return result

The result line result = [Measurement.model_validate(data) for data in last_data] converts the data from DB to pydantic as json response_model=list[Measurement] .

Scheduler - apscheduler and rpyc #

We use rpyc server for the scheduler apscheduler. If we start the fastapi server via unicorn with worker, we need only one scheduler and all worker needs access to the scheduler. The solution is stand alone rpyc server for the apscheduler. The lifespan from fastapi is cheching fcntl.lockf() via file and starts one task für the server.

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Code for Startup
    app.server = None
    app.scheduler = None
    app.scheduler_task = None
    try:
        #we need only one scheduler

        # +++++++
        _ = open("/tmp/fastapi.lock","w")
        _fd = _.fileno()
        fcntl.lockf(_fd,fcntl.LOCK_EX|fcntl.LOCK_NB)
        # +++++++

        app.scheduler_task =  asyncio.create_task(background_process(app))

    except BlockingIOError:
            pass

    app.logger.info("Server starts...")

    yield
    # Code  for Shutdown
    if app.server != None: app.server.close()
    if app.scheduler != None: app.scheduler.shutdown()
    if app.scheduler_task != None: app.scheduler_task.cancel(msg='Shutting down...')
    app.logger.info("Server shuts down...")

The asyncio.create_task(background_process(app)) starts the server in background. The rpyc.server is quit simple, we define an services class for the remote access from fastapi async anedpont.

async def background_process(app: FastAPI):
    '''background_process() starts the scheduler ansd the rpyc-server'''
    app.logger.info("background_process() started")

    app.scheduler = BackgroundScheduler()
    app.scheduler.configure(
        misfire_grace_time=300,  # 5 Minuten
        coalesce=True
    )
    
    # Add background task
    app.scheduler.add_job(periodic_task, 'date', id='periodic_task_1', run_date=datetime.now() + timedelta(seconds=3))
    app.scheduler.add_job(periodic_task, 'interval', id='periodic_task_2', seconds=SLEEP_TIME)
    app.scheduler.add_job(reorganisation_task, 'interval', id='reorganisation_task', seconds=60*60*24)
    app.scheduler.start()

    protocol_config = {"allow_public_attrs": True, "allow_pickle": True}
    app.server = ThreadedServer(
       SchedulerService, port=RPYX_PORT, protocol_config=protocol_config
    )

    try:
        app.server._start_in_thread()
        #blocking server start
        while True:
            await asyncio.sleep(1)
    except Exception as e:
        app.logger.error("Error while Server stopped with: "+str(e))  
       
    app.servers.close()
    app.servers = None

    app.scheduler.shutdown()
    app.scheduler = None

If we starts services remotely, we need {"allow_public_attrs": True, "allow_pickle": True} elsewhere only string parameters will work. In this example we starts fix job for the mqtt client and reading FritzBox. The second job cleans up redundant data and performs periodic summarizations.

Fastapi endpint /jobs is getting the jobs via remote access.

@app.get("/jobs")
async def get_jobs():
    jobs = []
    try:
        conn = rpyc.connect("localhost", RPYX_PORT)
        conn_jobs  = conn.root.get_jobs() 
        jobs = copy.deepcopy(conn_jobs) # deep copy required for build response
        conn.close() 
    except Exception as e:
        app.logger.error("Error while getting jobs: " + str(e))
        raise HTTPException(status_code=404, detail="No Items found")
        

    return [{"id": f"{obj.id}", "func": f"{obj.func.__name__}", "trigger": f"{obj.trigger}", "next_run_time": f"{obj.next_run_time}"} for obj in jobs]

mqtt client #

The sensor sends the measured values every 6 seconds via the tasmota server via mqtt broker. Background job periodic_task()`` is started every 10 minutes via the scheduler apscheduler``` and then waits 10 seconds to receive one measurement. The received data is then stored in the SQLite DB.

def periodic_task():
    '''
    start mqtt client and save data to database one time
    '''

    logger = logging.getLogger("py_hab")
    engine = get_db_sync()
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)

   
    mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,protocol=mqtt.MQTTv5, client_id=MQTT_CLIENT_ID)
    properties = Properties(PacketTypes.CONNECT)
    properties.SessionExpiryInterval = 3600  # Sets the session runtime to 1 hour

    userdata = {
        'engine': engine,
        'session': Session,
        'logger': logger
    }

    mqttc._userdata = userdata
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message

    try:
          
        mqttc.connect(MQTT_SERVER, MQTT_PORT, 60, clean_start=False, properties=properties)
        mqttc.loop_start()
        time.sleep(WAIT_TIME)  # if mqtt does not buffer, all measurement messages are read from these 10 sec 
        mqttc.disconnect()
        mqttc.loop_stop()
        if (USERNAME != "" and PASSWORD != ""):
            logger.info("Check FritzBox ...")
            connect_fritzbox(userdata)



    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        mqttc.disconnect()
        mqttc.loop_stop()

Only in the period between mqttc.loop_start() and mqttc.loop_stop() are the messages from the broker received and saved in the database. The tasmota server only sends qos=0 messages that are not buffered. Two event functions are registered: on_connect() for establishing the connection and on_message() for processing the messages. The structure _userdata is passed to the event functions and contains the DB session maker and the logger.

FritzBox #

The code is from FritzBox forum We need user/password on FritzBox with smartMeter credentials. The temperature from the radiator controller is read and stored in the DB. The battery status and the opening or closing of the window contacts are also stored.

(2024/12/27) Index