ESPHome 2025.8.0b2
Loading...
Searching...
No Matches
mqtt_backend_esp32.h
Go to the documentation of this file.
1#pragma once
2
3#include "mqtt_backend.h"
4#ifdef USE_MQTT
5#ifdef USE_ESP32
6
7#include <string>
8#include <queue>
9#include <cstring>
10#include <mqtt_client.h>
11#include <freertos/FreeRTOS.h>
12#include <freertos/task.h>
17
18namespace esphome {
19namespace mqtt {
20
21struct Event {
22 esp_mqtt_event_id_t event_id;
23 std::vector<char> data;
26 std::string topic;
27 int msg_id;
28 bool retain;
29 int qos;
30 bool dup;
32 esp_mqtt_error_codes_t error_handle;
33
34 // Construct from esp_mqtt_event_t
35 // Any pointer values that are unsafe to keep are converted to safe copies
36 Event(const esp_mqtt_event_t &event)
37 : event_id(event.event_id),
38 data(event.data, event.data + event.data_len),
39 total_data_len(event.total_data_len),
40 current_data_offset(event.current_data_offset),
41 topic(event.topic, event.topic_len),
42 msg_id(event.msg_id),
43 retain(event.retain),
44 qos(event.qos),
45 dup(event.dup),
46 session_present(event.session_present),
47 error_handle(*event.error_handle) {}
48};
49
56
58 char *topic;
59 char *payload;
60 uint16_t payload_len; // MQTT max payload is 64KiB
61 uint8_t type : 2;
62 uint8_t qos : 2; // QoS only needs values 0-2
63 uint8_t retain : 1;
64 uint8_t reserved : 3; // Reserved for future use
65
66 QueueElement() : topic(nullptr), payload(nullptr), payload_len(0), qos(0), retain(0), reserved(0) {}
67
68 // Helper to set topic/payload (uses RAMAllocator)
69 bool set_data(const char *topic_str, const char *payload_data, size_t len) {
70 // Check payload size limit (MQTT max is 64KiB)
71 if (len > std::numeric_limits<uint16_t>::max()) {
72 return false;
73 }
74
75 // Use RAMAllocator with default flags (tries external RAM first, falls back to internal)
76 RAMAllocator<char> allocator;
77
78 // Allocate and copy topic
79 size_t topic_len = strlen(topic_str) + 1;
80 topic = allocator.allocate(topic_len);
81 if (!topic)
82 return false;
83 memcpy(topic, topic_str, topic_len);
84
85 if (payload_data && len) {
86 payload = allocator.allocate(len);
87 if (!payload) {
88 allocator.deallocate(topic, topic_len);
89 topic = nullptr;
90 return false;
91 }
92 memcpy(payload, payload_data, len);
93 payload_len = static_cast<uint16_t>(len);
94 } else {
95 payload = nullptr;
96 payload_len = 0;
97 }
98 return true;
99 }
100
101 // Helper to release (uses RAMAllocator)
102 void release() {
103 RAMAllocator<char> allocator;
104 if (topic) {
105 allocator.deallocate(topic, strlen(topic) + 1);
106 topic = nullptr;
107 }
108 if (payload) {
109 allocator.deallocate(payload, payload_len);
110 payload = nullptr;
111 }
112 payload_len = 0;
113 }
114};
115
116class MQTTBackendESP32 final : public MQTTBackend {
117 public:
118 static const size_t MQTT_BUFFER_SIZE = 4096;
119 static const size_t TASK_STACK_SIZE = 3072;
120 static const size_t TASK_STACK_SIZE_TLS = 4096; // Larger stack for TLS operations
121 static const ssize_t TASK_PRIORITY = 5;
122 static const uint8_t MQTT_QUEUE_LENGTH = 30; // 30*12 bytes = 360
123
124 void set_keep_alive(uint16_t keep_alive) final { this->keep_alive_ = keep_alive; }
125 void set_client_id(const char *client_id) final { this->client_id_ = client_id; }
126 void set_clean_session(bool clean_session) final { this->clean_session_ = clean_session; }
127
128 void set_credentials(const char *username, const char *password) final {
129 if (username)
130 this->username_ = username;
131 if (password)
132 this->password_ = password;
133 }
134 void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final {
135 if (topic)
136 this->lwt_topic_ = topic;
137 this->lwt_qos_ = qos;
138 if (payload)
139 this->lwt_message_ = payload;
140 this->lwt_retain_ = retain;
141 }
142 void set_server(network::IPAddress ip, uint16_t port) final {
143 this->host_ = ip.str();
144 this->port_ = port;
145 }
146 void set_server(const char *host, uint16_t port) final {
147 this->host_ = host;
148 this->port_ = port;
149 }
150 void set_on_connect(std::function<on_connect_callback_t> &&callback) final {
151 this->on_connect_.add(std::move(callback));
152 }
153 void set_on_disconnect(std::function<on_disconnect_callback_t> &&callback) final {
154 this->on_disconnect_.add(std::move(callback));
155 }
156 void set_on_subscribe(std::function<on_subscribe_callback_t> &&callback) final {
157 this->on_subscribe_.add(std::move(callback));
158 }
159 void set_on_unsubscribe(std::function<on_unsubscribe_callback_t> &&callback) final {
160 this->on_unsubscribe_.add(std::move(callback));
161 }
162 void set_on_message(std::function<on_message_callback_t> &&callback) final {
163 this->on_message_.add(std::move(callback));
164 }
165 void set_on_publish(std::function<on_publish_user_callback_t> &&callback) final {
166 this->on_publish_.add(std::move(callback));
167 }
168 bool connected() const final { return this->is_connected_; }
169
170 void connect() final {
171 if (!is_initalized_) {
172 if (initialize_()) {
173 esp_mqtt_client_start(handler_.get());
174 }
175 }
176 }
177 void disconnect() final {
178 if (is_initalized_)
179 esp_mqtt_client_disconnect(handler_.get());
180 }
181
182 bool subscribe(const char *topic, uint8_t qos) final {
183#if defined(USE_MQTT_IDF_ENQUEUE)
184 return enqueue_(MQTT_QUEUE_TYPE_SUBSCRIBE, topic, qos);
185#else
186 return esp_mqtt_client_subscribe(handler_.get(), topic, qos) != -1;
187#endif
188 }
189 bool unsubscribe(const char *topic) final {
190#if defined(USE_MQTT_IDF_ENQUEUE)
192#else
193 return esp_mqtt_client_unsubscribe(handler_.get(), topic) != -1;
194#endif
195 }
196
197 bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final {
198#if defined(USE_MQTT_IDF_ENQUEUE)
199 return enqueue_(MQTT_QUEUE_TYPE_PUBLISH, topic, qos, retain, payload, length);
200#else
201 // might block for several seconds, either due to network timeout (10s)
202 // or if publishing payloads longer than internal buffer (due to message fragmentation)
203 return esp_mqtt_client_publish(handler_.get(), topic, payload, length, qos, retain) != -1;
204#endif
205 }
207
208 void loop() final;
209
210 void set_ca_certificate(const std::string &cert) { ca_certificate_ = cert; }
211 void set_cl_certificate(const std::string &cert) { cl_certificate_ = cert; }
212 void set_cl_key(const std::string &key) { cl_key_ = key; }
213 void set_skip_cert_cn_check(bool skip_check) { skip_cert_cn_check_ = skip_check; }
214
215 // No destructor needed: ESPHome components live for the entire device runtime.
216 // The MQTT task and queue will run until the device reboots or loses power,
217 // at which point the entire process terminates and FreeRTOS cleans up all tasks.
218 // Implementing a destructor would add complexity and potential race conditions
219 // for a scenario that never occurs in practice.
220
221 protected:
222 bool initialize_();
223 void mqtt_event_handler_(const Event &event);
224 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
225
227 void operator()(esp_mqtt_client *client_handler) { esp_mqtt_client_destroy(client_handler); }
228 };
229 using ClientHandler_ = std::unique_ptr<esp_mqtt_client, MqttClientDeleter>;
231
232 bool is_connected_{false};
233 bool is_initalized_{false};
234
235 esp_mqtt_client_config_t mqtt_cfg_{};
236
237 std::string host_;
238 uint16_t port_;
239 std::string username_;
240 std::string password_;
241 std::string lwt_topic_;
242 std::string lwt_message_;
243 uint8_t lwt_qos_;
245 std::string client_id_;
246 uint16_t keep_alive_;
252#if defined(USE_MQTT_IDF_ENQUEUE)
253 static void esphome_mqtt_task(void *params);
256 TaskHandle_t task_handle_{nullptr};
257 bool enqueue_(MqttQueueTypeT type, const char *topic, int qos = 0, bool retain = false, const char *payload = NULL,
258 size_t len = 0);
259#endif
260
261 // callbacks
268 std::queue<Event> mqtt_events_;
269
270#if defined(USE_MQTT_IDF_ENQUEUE)
272 static constexpr uint32_t DROP_LOG_INTERVAL_MS = 10000; // Log every 10 seconds
273#endif
274};
275
276} // namespace mqtt
277} // namespace esphome
278
279#endif
280#endif
An STL allocator that uses SPI or internal RAM.
Definition helpers.h:818
void deallocate(T *p, size_t n)
Definition helpers.h:876
T * allocate(size_t n)
Definition helpers.h:838
void set_keep_alive(uint16_t keep_alive) final
void set_on_message(std::function< on_message_callback_t > &&callback) final
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
void set_ca_certificate(const std::string &cert)
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
void set_client_id(const char *client_id) final
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
void set_on_publish(std::function< on_publish_user_callback_t > &&callback) final
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
void set_server(const char *host, uint16_t port) final
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_event_pool_
void set_cl_key(const std::string &key)
void set_on_connect(std::function< on_connect_callback_t > &&callback) final
void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final
bool subscribe(const char *topic, uint8_t qos) final
void set_server(network::IPAddress ip, uint16_t port) final
void set_cl_certificate(const std::string &cert)
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
esp_mqtt_client_config_t mqtt_cfg_
void set_skip_cert_cn_check(bool skip_check)
void set_on_unsubscribe(std::function< on_unsubscribe_callback_t > &&callback) final
bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final
CallbackManager< on_publish_user_callback_t > on_publish_
void set_clean_session(bool clean_session) final
bool unsubscribe(const char *topic) final
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
void set_on_subscribe(std::function< on_subscribe_callback_t > &&callback) final
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
void set_on_disconnect(std::function< on_disconnect_callback_t > &&callback) final
void set_credentials(const char *username, const char *password) final
std::unique_ptr< esp_mqtt_client, MqttClientDeleter > ClientHandler_
virtual bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain)=0
uint8_t type
__int64 ssize_t
Definition httplib.h:175
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7
std::string size_t len
Definition helpers.h:279
esp_mqtt_event_id_t event_id
std::vector< char > data
esp_mqtt_error_codes_t error_handle
Event(const esp_mqtt_event_t &event)
void operator()(esp_mqtt_client *client_handler)
bool set_data(const char *topic_str, const char *payload_data, size_t len)
uint16_t length
Definition tt21100.cpp:0
uint8_t event_id
Definition tt21100.cpp:3