Aprende qué es MQTT y crea un cliente con Angular 13 🅰

Te explico qué es MQTT y te enseño a desarrollar un cliente bajo este protocolo probándolo con un broker en la nube

13.08.2022 a las 18:21

Aprende qué es MQTT y crea un cliente con Angular 13 🅰

⚫ ¿Qué es MQTT?

🟣 Arquitectura

🟣 Modelo Publicador/Subscriptor

🟣 Los topics en MQTT

🟣 Tipos de publicaciones

⚫ Tipos de broker MQTT

🟣 En la nube

🟣 En local

⚫ Cliente MQTT con Angular 13

🟣 Código en mi GitHub

🟣 app.module.ts

🟣 Servicio MQTT

🟣 app.component.html

🟣 app.component.ts

Aprende qué es MQTT y crea un cliente con Angular 13 🅰

Te explico qué es MQTT y te enseño a desarrollar un cliente bajo este protocolo probándolo con un broker en la nube

¿Qué es MQTT?

Esto te voy a enseñar a programar 👇, dale clic para probarlo.

Pero antes, un poquito de teoría, MQTT es un acrónimo que viene de Message Queuing Telemetry Transport.


MQTT es un protocolo de conectividad utilizado fundamentalmente en el mundo de IoT para comunicarse entre máquinas.


Es un protocolo de alto nivel y se puede usar a través de una conexión WebSockets, por lo que podemos usar MQTT fácilmente en cualquier navegador web que admita WebSockets.

Arquitectura

La arquitectura de un ecosistema que usa el protocolo MQTT es:

  • Publicador; es el responsable de publicar mensajes MQTT, normalmente es un dispositivo IoT como una bombilla, un termómetro...
  • Subscriptor; es un dispositivo que desea escuchar mensajes de algún o algunos dispositivos.
  • Broker MQTT; es el servidor que recibe los datos de los publicadores y los envía a los subscriptores.

Modelo Publicador/Subscriptor

Como puedes deducir del dibujo de arriba, MQTT usa el patrón publicador/subscriptor. Este patrón permite el uso de varios subscriptores por lo que varios clientes pueden crear una conexión MQTT y suscribirse a datos desde un solo dispositivo.

Los topics en MQTT

Un topic (tema) MQTT es el concepto utilizado para la comunicación entre publicadores y suscriptores. Cuando un suscriptor quiere obtener datos de un dispositivo, se suscribe a un tema (topic) específico, que será donde el dispositivo publique sus datos. Un topic es una cadena UTF-8 jerárquica como por ejemplo /miCasa/habitacionAdriana/temperatura

Tipos de publicaciones

  • QoS=0, como máximo una vez; el mensaje se entrega como máximo una vez, o no se entrega.


    No se efectúa acuse de recibo la entrega del mensaje por la red. El mensaje no se almacena. El mensaje puede perderse si el cliente se desconecta o si falla el servidor.


    QoS=0 es la modalidad de transferencia más rápida. Se denomina a veces "transmitir y olvidar". El protocolo MQTT no necesita que los servidores reenvíen publicaciones con QoS=0 a un cliente. Si el cliente está desconectado cuando el servidor recibe la publicación, es posible que se descarte la publicación, dependiendo del servidor.


    El servicio de telemetría (MQXR) no descarta los mensajes enviados con QoS=0. Se almacenan como mensajes no persistentes y sólo se rechazan si se detiene el gestor de colas.


  • QoS=1 al menos una vez; QoS=1 es el valor predeterminado la modalidad de transferencia. El mensaje siempre se entrega, como mínimo, una vez.


    Si el emisor no recibe un acuse de recibo, el mensaje se envía de nuevo con el distintivo DUP establecido hasta que se reciba un acuse de recibo. Como resultado, un mismo mensaje se puede enviar varias veces al receptor y ser procesado varias veces.


    El mensaje se debe almacenar localmente en el emisor y el receptor hasta que se procese. El mensaje se suprime del receptor después de que se haya procesado. Si el receptor es un intermediario, el mensaje se publica a sus suscriptores. Si el receptor es un cliente, el mensaje se entrega a la aplicación de suscriptor.


    Una vez suprimido el mensaje, el receptor envía un acuse de recibo al emisor. El mensaje se suprime del emisor después de que se haya recibido el acuse de recibo por parte del receptor.


  • QoS=2 exactamente una vez; el mensaje se entrega siempre exactamente una vez.


    El mensaje debe almacenarse localmente en el emisor y el receptor hasta que se procese. QoS=2 es la modalidad de transferencia más segura, pero la más lenta.


    Deben realizarse como mínimo dos pares de transmisiones entre el emisor y el receptor antes de que el mensaje pueda suprimirse de la parte del emisor. El mensaje puede procesarse en el receptor tras la primera transmisión. En el primer par de transmisiones, el emisor transmite el mensaje y obtiene acuse de recibo del receptor que ha almacenado el mensaje.


    Si el emisor no recibe un acuse de recibo, el mensaje se envía de nuevo con el distintivo DUP establecido hasta que se reciba un acuse de recibo. En el segundo par de transmisiones, el emisor comunica al receptor que puede completar el proceso del mensaje, “PUBREL”. Si el emisor no recibe un acuse de recibo del mensaje “PUBREL”, el mensaje “PUBREL” se envía de nuevo hasta que se recibe un acuse de recibo.


    El emisor suprime el mensaje que ha guardado al recibir el acuse de recibo para el mensaje “PUBREL” El receptor puede procesar el mensaje proporcionado en la primera o segunda fase, no tiene que volver a procesarlo. Si el receptor es un intermediario, publica el mensaje a los suscriptores. Si el receptor es un cliente, el mensaje se entrega a la aplicación de suscriptor.


    El receptor devuelve un mensaje de finalización al emisor para comunicarle que ha terminado de procesar el mensaje.

Tipos de broker MQTT

En la nube

A continuación, te digo los que he probado en la nube, son gratuitos y públicos por lo que no debes enviar datos privados ya que serán visibles por el resto de usuarios.

  • MOSQUITTO.ORG

    Tiene todas las combinaciones que necesitas, TCP, TLS, websockets, con y sin encriptar y con y sin autenticación.

  • MQTTHQ; además de gratuito y público también es de alta disponibilidad. Sólo he conseguido probar en una conexión sin cifrar (ws);

  • HiveMQ; sólo he conseguido hacer pruebas sin cifrar (ws).

En local

En local sin duda el rey es mosquitto, es seguro, de código abierto y con un buen rendimiento.


Tengo pendiente hacer un artículo desplegando un broker MQTT mosquitto sobre docker en un servidor en la nube.

Cliente MQTT con Angular 13 🅰

Vamos al turrón


Nos vamos a apoyar en la librería ngx-mqtt.

Para instalarla 👇

npm install ngx-mqtt --save

Código en mi GitHub

Si quieres ir al grano puedes clonar mi repositorio de GitHub y en minutos lo estás probando 🚀

app.module.ts

  import { NgModule } from '@angular/core';
  import { BrowserModule } from '@angular/platform-browser';
  import { FormsModule, ReactiveFormsModule } from '@angular/forms';
  
  import { AppComponent } from './app.component';
  import { environment } from 'src/environments/environment';
  
  //MQTT
  import { IMqttServiceOptions, MqttModule } from "ngx-mqtt";
  
  const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = {
    hostname: environment.mqtt.server,
    port: environment.mqtt.port,
    protocol: (environment.mqtt.protocol === "wss") ? "wss" : "ws",
    path: '/mqtt'
  };
  
  @NgModule({
    declarations: [
      AppComponent
    ],
    imports: [
      BrowserModule, FormsModule, ReactiveFormsModule,MqttModule.forRoot(MQTT_SERVICE_OPTIONS),
    ],
    providers: [],
    bootstrap: [AppComponent]
  })
  export class AppModule { }
  

Servicio MQTT

Me creo un servicio para poder utilizar la lógica en otros componentes.

import { Injectable } from '@angular/core';
import { IMqttMessage, MqttConnectionState, MqttService, IMqttServiceOptions } from 'ngx-mqtt';
import { Observable } from 'rxjs';
import { formatHour } from '../helpers/date.helper';

@Injectable({
  providedIn: 'root'
})
export class MyMqttService {

  

  constructor(private ngxMqttService: MqttService) { 
  
  }

  public connect(host:string, port:number):Observable<MqttConnectionState>{
    const MQTT_SERVICE_OPTIONS: IMqttServiceOptions = {
      hostname: host,
      port,
      protocol: "ws",
      path: '/mqtt'
    };
    this.ngxMqttService.connect(MQTT_SERVICE_OPTIONS);

    return this.ngxMqttService.state;
  }

  topic(topicName: string): Observable<IMqttMessage> {     
    return this.ngxMqttService.observe(topicName);
  }

  sendmsg(topicName:string, message:string): void {
    // use unsafe publish for non-ssl websockets
    this.ngxMqttService.unsafePublish(topicName, message, { qos: 2, retain: true })
    //qos: 0 como mucho una
    //qos: 1 como mínimo una
    //qos: 2 justo una
  }

}

app.component.html

  <main class="container" style="max-width: 600px">
  <div class="row" style="margin-top: 1rem">
    <h1>Cliente MQTT con Angular 13</h1>
    <details [open]="connectionDetailsElementOpen">
      <summary>DETALLES DE CONEXIÓN</summary>
      <form [formGroup]="connectionForm" (submit)="toConnect()">
        <label for="">
          Broker MQTT
          <input
            type="text"
            placeholder="Aquí el broker MQTT"
            formControlName="brokerHost"
          />
          <small>Por ejemplo public.mqtthq.com</small>
        </label>
        <label for="">
          Puerto
          <input
            type="number"
            placeholder="Aquí el puerto"
            formControlName="brokerPort"
          />
          <small>Por ejemplo 8083</small>
        </label>

        <button id="connectButtonDOMElement" type="submit" [disabled]="connectionForm.invalid" [class.secondary] = "connectionForm.invalid">
          CONECTAR
        </button>
      </form>
    </details>
    <details class="scale-in-center" [class.toHidden]="connectionDetailsElementOpen" open>
        <summary>DETALLES PARA ENVÍO DE MENSAJE</summary>
        <form [formGroup]="topicForm" (submit)="toSubcribeToTopic()">
            <label for="">
              Topic al que subscribirse
              <input
                type="text"
                placeholder="Aquí el topic"
                formControlName="mqttTopic"
              />
              <small>Por ejemplo /fjmduran/test</small>
            </label>           
            <button id="topicButtonDOMElement" type="submit" [disabled]="topicForm.invalid">
              SUBSCRIBIR
            </button>
          </form>
          <form [formGroup]="messageForm" (submit)="toSendMessage()" *ngIf="topicSub && !topicSub.closed">         
            <label for="">
              Mensaje a enviar
              <input
                type="text"
                placeholder="Aquí el mensaje"
                formControlName="mqttMessage"
              />
              <small>Por ejemplo Hola MQTT!</small>
            </label>
    
            <button type="submit" [disabled]="messageForm.invalid">
              ENVIAR
            </button>
          </form>
    </details>
  </div>
  <ul class="terminal">
    <li *ngFor="let message of mLog">{{message}}</li>
  </ul>
</main>

app.component.ts

import { Component } from '@angular/core';
import {
  FormBuilder,
  FormControl,
  FormGroup,
  Validators,
} from '@angular/forms';
import { MqttConnectionState } from 'ngx-mqtt';
import { Observable, Subscription } from 'rxjs';
import { formatHour } from './helpers/date.helper';
import { MyMqttService } from './services/mqtt.service';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent {
  title = 'Cliente MQTT con Angular 13';
  connectionForm: FormGroup;
  topicForm: FormGroup;
  messageForm: FormGroup;

  mLog: string[] = [];

  public mqttConnectionSub!: Subscription;
  public topicSub!: Subscription;
  private subs:Subscription[]=[];

  public connectionDetailsElementOpen = true;
  private connectButtonDOMElement: Element | null = null;
  private topicButtonDOMElement: Element | null = null;

  constructor(private fb: FormBuilder, private mqttService: MyMqttService) {
    this.connectionForm = this.fb.group({
      brokerHost: new FormControl('public.mqtthq.com', [Validators.required]),
      brokerPort: new FormControl(8083, [
        Validators.required,
        Validators.min(0),
        Validators.max(9999),
      ]),
    });
    this.topicForm = this.fb.group({
      mqttTopic: new FormControl('/fjmduran/test', [
        Validators.required,
        Validators.minLength(1),
      ]),
    });
    this.messageForm = this.fb.group({
      mqttMessage: new FormControl('Hola MQTT!', [
        Validators.required,
        Validators.minLength(1),
      ]),
    });
  }

  ngOnInit() {
    this.connectButtonDOMElement = document.getElementById('connectButtonDOMElement');
    this.topicButtonDOMElement = document.getElementById('topicButtonDOMElement');
    this.writeLog('https://fjmduran.com 🤓');
  }

  toConnect() {
    const hostName: string = this.connectionForm.get('brokerHost')?.value;
    const port: number = this.connectionForm.get('brokerPort')?.value;

    if (this.mqttConnectionSub && !this.mqttConnectionSub.closed) {
      this.mqttConnectionSub.unsubscribe();
      this.writeLog('🔴 CONEXIÓN CERRADA');
      this.changeConnectionStatus('desconectado');
      return;
    }
    this.mqttConnectionSub = this.mqttService
      .connect(hostName, port)
      .subscribe((connectionState: MqttConnectionState) => {
        switch (connectionState) {
          case MqttConnectionState.CLOSED:
            this.writeLog('CONEXIÓN CERRADA, REINTENTANDO...');
            this.changeConnectionStatus('esperando');
            break;
          case MqttConnectionState.CONNECTING:
            this.writeLog('CONECTANDO...');
            this.changeConnectionStatus('esperando');
            break;
          case MqttConnectionState.CONNECTED:
            this.writeLog('🟢 CONECTADO');
            this.changeConnectionStatus('conectado');
            break;
        }
      });
      this.subs.push(this.mqttConnectionSub);
  }

  private changeConnectionStatus(
    status: 'conectado' | 'esperando' | 'desconectado'
  ) {
    switch (status) {
      case 'esperando':
        this.connectionDetailsElementOpen = true;
        this.connectButtonDOMElement?.setAttribute('aria-busy', String(true));
        if (this.connectButtonDOMElement) {
          this.connectButtonDOMElement.innerHTML = 'CONECTANDO...';
          this.connectButtonDOMElement.className = 'secondary';
        }
        break;
      case 'desconectado':
        this.connectionDetailsElementOpen = true;
        this.connectButtonDOMElement?.setAttribute('aria-busy', String(false));
        if (this.connectButtonDOMElement) {
          this.connectButtonDOMElement.innerHTML = 'CONECTAR';
          this.connectButtonDOMElement.className = 'primary';
        }
        break;
      case 'conectado':
        this.connectionDetailsElementOpen = false;
        this.connectButtonDOMElement?.setAttribute('aria-busy', String(false));
        if (this.connectButtonDOMElement) {
          this.connectButtonDOMElement.innerHTML = 'DESCONECTAR';
          this.connectButtonDOMElement.className = 'secondary';
        }
        break;
    }
  }

  public toSubcribeToTopic(){
    const topic=this.topicForm.get('mqttTopic')?.value;
    if(this.topicSub && !this.topicSub.closed){
      this.topicSub.unsubscribe();
      this.writeLog(ELIMINADA SUBSCRIPCIÓN A topic);
      if (this.topicButtonDOMElement) {
        this.topicButtonDOMElement.innerHTML = 'SUBSCRIBIR';
        this.topicButtonDOMElement.className = 'primary';
      }
      return;
    } 
    this.topicSub= this.mqttService.topic(topic).subscribe(message=>{
      const messageUTF8=new TextDecoder('utf-8').decode(message.payload);
      console.log(messageUTF8);
      this.writeLog(RECIBIDO: messageUTF8 👈)
    });
    this.writeLog(SUBSCRITO AL TOPIC topic);
    if (this.topicButtonDOMElement) {
      this.topicButtonDOMElement.innerHTML = 'ELIMINAR SUBSCRIPCIÓN';
      this.topicButtonDOMElement.className = 'secondary';
    }
    this.subs.push(this.topicSub);
  }

  public toSendMessage(){
    const topic=this.topicForm.get('mqttTopic')?.value;
    const message=this.messageForm.get('mqttMessage')?.value;
    let logMessage='';
    if(!topic){      
      logMessage=Falta el topic;
      console.log(logMessage);
      this.writeLog(logMessage);
      return;
    }
    if(!message){
      logMessage=Falta el mensaje;
      console.log(logMessage);
      this.writeLog(logMessage);
      return;
    }
    this.mqttService.sendmsg(topic,message);
    this.writeLog(ENVIADO {message} AL TOPIC {topic} 👉)
  }

  private writeLog(message: string): void {
    const now = new Date();
    const writeMessage = formatHour(now) -> message;
    console.log(writeMessage);
    this.mLog.unshift(writeMessage);
  }

 

  ngOnDestroy() {
    this.subs.forEach(sub=>sub.unsubscribe());
  }
}

  

Hasta luego 🖖

Servicios

Software

IoT

Digitalización

Aplicaciones móviles

Consultoría

fjmduran.com v0.1.2