2021-11-30 14:51:24 +01:00

1282 lines
49 KiB
C

/*
* AWS IoT Over-the-air Update v3.2.0
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file ota_mqtt.c
* @brief Routines for supporting over the air updates using MQTT.
*/
/* Standard includes. */
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
/* OTA includes. */
#include "ota.h"
#include "ota_private.h"
#include "ota_cbor_private.h"
/* Private include. */
#include "ota_mqtt_private.h"
/* Include firmware version struct definition. */
#include "ota_appversion32.h"
/* Stream GET message constants. */
#define OTA_CLIENT_TOKEN "rdy" /*!< Arbitrary client token sent in the stream "GET" message. */
/* Agent to Job Service status message constants. */
#define OTA_STATUS_MSG_MAX_SIZE 128U /*!< Max length of a job status message to the service. */
/**
* @brief Topic strings used by the OTA process.
*
* These first few are topic extensions to the dynamic base topic that includes the Thing name.
*/
#define MQTT_API_THINGS "$aws/things/" /*!< Topic prefix for thing APIs. */
#define MQTT_API_JOBS_NEXT_GET "/jobs/$next/get" /*!< Topic suffix for job API. */
#define MQTT_API_JOBS_NOTIFY_NEXT "/jobs/notify-next" /*!< Topic suffix for job API. */
#define MQTT_API_JOBS "/jobs/" /*!< Job API identifier. */
#define MQTT_API_UPDATE "/update" /*!< Job API identifier. */
#define MQTT_API_STREAMS "/streams/" /*!< Stream API identifier. */
#define MQTT_API_DATA_CBOR "/data/cbor" /*!< Stream API suffix. */
#define MQTT_API_GET_CBOR "/get/cbor" /*!< Stream API suffix. */
/* NOTE: The format specifiers in this string are placeholders only; the lengths of these
* strings are used to calculate buffer sizes.
*/
static const char pOtaJobsGetNextTopicTemplate[] = MQTT_API_THINGS "%s"MQTT_API_JOBS_NEXT_GET; /*!< Topic template to request next job. */
static const char pOtaJobsNotifyNextTopicTemplate[] = MQTT_API_THINGS "%s"MQTT_API_JOBS_NOTIFY_NEXT; /*!< Topic template to notify next . */
static const char pOtaJobStatusTopicTemplate[] = MQTT_API_THINGS "%s"MQTT_API_JOBS "%s"MQTT_API_UPDATE; /*!< Topic template to update the current job. */
static const char pOtaStreamDataTopicTemplate[] = MQTT_API_THINGS "%s"MQTT_API_STREAMS "%s"MQTT_API_DATA_CBOR; /*!< Topic template to receive data over a stream. */
static const char pOtaGetStreamTopicTemplate[] = MQTT_API_THINGS "%s"MQTT_API_STREAMS "%s"MQTT_API_GET_CBOR; /*!< Topic template to request next data over a stream. */
static const char pOtaGetNextJobMsgTemplate[] = "{\"clientToken\":\"%u:%s\"}"; /*!< Used to specify client token id to authenticate job. */
static const char pOtaStringReceive[] = "\"receive\""; /*!< Used to build the job receive template. */
/** We map all of the above status cases to one of these status strings.
* These are the only strings that are supported by the Job Service. You
* shall not change them to arbitrary strings or the job will not change
* states.
* */
#define JOBS_API_STATUS_IN_PROGRESS "IN_PROGRESS" /*!< The job document has be received on the device and update is in progress. */
#define JOBS_API_STATUS_FAILED "FAILED" /*!< OTA update failed due to an error. */
#define JOBS_API_STATUS_SUCCEEDED "SUCCEEDED" /*!< OTA update succeeded. */
#define JOBS_API_STATUS_REJECTED "REJECTED" /*!< The job was rejected due to invalid parameters. */
/**
* @brief List of all the status cases a job can be in.
*
*/
static const char * pOtaJobStatusStrings[ NumJobStatusMappings ] =
{
"{\"status\":\""JOBS_API_STATUS_IN_PROGRESS "\",\"statusDetails\":{",
"{\"status\":\""JOBS_API_STATUS_FAILED "\",\"statusDetails\":{",
"{\"status\":\""JOBS_API_STATUS_SUCCEEDED "\",\"statusDetails\":{",
"{\"status\":\""JOBS_API_STATUS_REJECTED "\",\"statusDetails\":{",
"{\"status\":\""JOBS_API_STATUS_FAILED "\",\"statusDetails\":{", /* eJobStatus_FailedWithVal */
};
/**
* @brief These are the associated statusDetails 'reason' codes that go along with
* the above enums during the OTA update process. The 'Receiving' state is
* updated with transfer progress as number of blocks received of total blocks.
*
*/
static const char * pOtaJobReasonStrings[ NumJobReasons ] = { "", "ready", "active", "accepted", "rejected", "aborted" };
/**
* @brief These are used for both decimal and hex string conversions.
*/
static const char asciiDigits[] =
{
'0', '1', '2', '3',
'4', '5', '6', '7',
'8', '9', 'a', 'b',
'c', 'd', 'e', 'f',
};
/* Maximum lengths for constants used in the ota_mqtt_topic_strings templates.
* These are used to calculate the static size of buffers used to store MQTT
* topic and message strings. Each length is in terms of bytes. */
#define U32_MAX_LEN 10U /*!< Maximum number of output digits of an unsigned long value. */
#define JOB_NAME_MAX_LEN 128U /*!< Maximum length of the name of job documents received from the server. */
#define STREAM_NAME_MAX_LEN 44U /*!< Maximum length for the name of MQTT streams. */
#define NULL_CHAR_LEN 1U /*!< Size of a single null character used to terminate topics and messages. */
/* Pre-calculate max buffer size for mqtt topics and messages. We make sure the buffer size is large
* enough to hold a dynamically constructed topic and message string.
*/
#define TOPIC_PLUS_THINGNAME_LEN( topic ) ( CONST_STRLEN( topic ) + otaconfigMAX_THINGNAME_LEN + NULL_CHAR_LEN ) /*!< Calculate max buffer size based on topic template and thing name length. */
#define TOPIC_GET_NEXT_BUFFER_SIZE ( TOPIC_PLUS_THINGNAME_LEN( pOtaJobsGetNextTopicTemplate ) ) /*!< Max buffer size for `jobs/$next/get` topic. */
#define TOPIC_NOTIFY_NEXT_BUFFER_SIZE ( TOPIC_PLUS_THINGNAME_LEN( pOtaJobsNotifyNextTopicTemplate ) ) /*!< Max buffer size for `jobs/notify-next` topic. */
#define TOPIC_JOB_STATUS_BUFFER_SIZE ( TOPIC_PLUS_THINGNAME_LEN( pOtaJobStatusTopicTemplate ) + JOB_NAME_MAX_LEN ) /*!< Max buffer size for `jobs/<job_name>/update` topic. */
#define TOPIC_STREAM_DATA_BUFFER_SIZE ( TOPIC_PLUS_THINGNAME_LEN( pOtaStreamDataTopicTemplate ) + STREAM_NAME_MAX_LEN ) /*!< Max buffer size for `streams/<stream_name>/data/cbor` topic. */
#define TOPIC_GET_STREAM_BUFFER_SIZE ( TOPIC_PLUS_THINGNAME_LEN( pOtaGetStreamTopicTemplate ) + STREAM_NAME_MAX_LEN ) /*!< Max buffer size for `streams/<stream_name>/get/cbor` topic. */
#define MSG_GET_NEXT_BUFFER_SIZE ( TOPIC_PLUS_THINGNAME_LEN( pOtaGetNextJobMsgTemplate ) + U32_MAX_LEN ) /*!< Max buffer size for message of `jobs/$next/get topic`. */
/**
* @brief Subscribe to the jobs notification topic (i.e. New file version available).
*
* @param[in] pAgentCtx Agent context which stores the thing details and mqtt interface.
* @return OtaMqttStatus_t Result of the subscribe operation, OtaMqttSuccess if the operation is successful
*/
static OtaMqttStatus_t subscribeToJobNotificationTopics( const OtaAgentContext_t * pAgentCtx );
/**
* @brief UnSubscribe from the firmware update receive topic.
*
* @param[in] pAgentCtx Agent context which stores the thing details and mqtt interface.
* @return OtaMqttStatus_t Result of the unsubscribe operation, OtaMqttSuccess if the operation is successful.
*/
static OtaMqttStatus_t unsubscribeFromDataStream( const OtaAgentContext_t * pAgentCtx );
/**
* @brief UnSubscribe from the jobs notification topic.
*
* @param[in] pAgentCtx Agent context which stores the thing details and mqtt interface.
* @return OtaMqttStatus_t Result of the unsubscribe operation, OtaMqttSuccess if the operation is successful.
*/
static OtaMqttStatus_t unsubscribeFromJobNotificationTopic( const OtaAgentContext_t * pAgentCtx );
/**
* @brief Publish a message to the job status topic.
*
* @param[in] pAgentCtx Agent context which provides the details for the thing, job and mqtt interface.
* @param[in] pMsg Message to publish.
* @param[in] msgSize Size of message to send.
* @param[in] qos Quality of service level for mqtt.
* @return OtaMqttStatus_t OtaMqttSuccess if the message is publish is successful.
*/
static OtaMqttStatus_t publishStatusMessage( OtaAgentContext_t * pAgentCtx,
const char * pMsg,
uint32_t msgSize,
uint8_t qos );
/**
* @brief Populate the message buffer with the job status message.
*
* @param[in] pMsgBuffer Buffer to populate.
* @param[in] msgBufferSize Size of the message.
* @param[in] status Status of the operation.
* @param[in] pOTAFileCtx File context stores the information about the downloaded blocks and required size.
* @return uint32_t Size of the message built.
*/
static uint32_t buildStatusMessageReceiving( char * pMsgBuffer,
size_t msgBufferSize,
OtaJobStatus_t status,
const OtaFileContext_t * pOTAFileCtx );
/**
* @brief Populate the message buffer with the message to indicate device in self-test.
*
* @param[in] pMsgBuffer Buffer to populate.
* @param[in] msgBufferSize Size of the message.
* @param[in] status Status of the operation.
* @param[in] reason Reason for job failure (if any).
* @return uint32_t Size of the message.
*/
static uint32_t prvBuildStatusMessageSelfTest( char * pMsgBuffer,
size_t msgBufferSize,
OtaJobStatus_t status,
int32_t reason );
/**
* @brief Populate the response message with the status of the job.
*
* @param[in] pMsgBuffer Buffer to populate.
* @param[in] msgBufferSize Size of the message.
* @param[in] status Status of the operation.
* @param[in] reason Reason for failure or the new firmware version.
* @param[in] subReason Error code due to which the operation failed.
* @param[in] previousVersion Version from which the new version was updated.
* @return uint32_t Size of the message.
*/
static uint32_t prvBuildStatusMessageFinish( char * pMsgBuffer,
size_t msgBufferSize,
OtaJobStatus_t status,
int32_t reason,
int32_t subReason,
uint32_t previousVersion );
/**
* @brief Build a string from a set of strings
*
* @param[in] pBuffer Buffer to place the output string in.
* @param[in] bufferSizeBytes Size of the buffer pointed to by pBuffer.
* @param[in] strings NULL-terminated array of string pointers.
* @return size_t Length of the output string, not including the terminator.
*/
static size_t stringBuilder( char * pBuffer,
size_t bufferSizeBytes,
const char * strings[] );
/**
* @brief Build a string with the decimal representation of a uint32_t value.
*
* @param[in] pBuffer Buffer to place the output string in.
* @param[in] bufferSizeBytes Size of the buffer pointed to by pBuffer.
* @param[in] value The uint32_t value to convert.
* @return size_t Length of the output string, not including the terminator.
*/
static size_t stringBuilderUInt32Decimal( char * pBuffer,
size_t bufferSizeBytes,
uint32_t value );
/**
* @brief Build a string with the hex representation of a uint32_t value.
*
* @param[in] pBuffer Buffer to place the output string in.
* @param[in] bufferSizeBytes Size of the buffer pointed to by pBuffer.
* @param[in] value The uint32_t value to convert.
* @return size_t Length of the output string, not including the terminator.
*/
static size_t stringBuilderUInt32Hex( char * pBuffer,
size_t bufferSizeBytes,
uint32_t value );
static size_t stringBuilder( char * pBuffer,
size_t bufferSizeBytes,
const char * strings[] )
{
size_t curLen = 0;
int i;
size_t thisLength = 0;
pBuffer[ 0 ] = '\0';
for( i = 0; strings[ i ] != NULL; i++ )
{
thisLength = strlen( strings[ i ] );
/* Assert if there is not enough buffer space. */
assert( thisLength + curLen + 1 <= bufferSizeBytes );
( void ) strncat( pBuffer, strings[ i ], bufferSizeBytes - curLen - 1U );
curLen += thisLength;
}
return curLen;
}
static size_t stringBuilderUInt32Decimal( char * pBuffer,
size_t bufferSizeBytes,
uint32_t value )
{
char workBuf[ U32_MAX_LEN ];
char * pCur = workBuf;
char * pDest = pBuffer;
size_t size = 0;
/* Assert if there is not enough buffer space. */
assert( bufferSizeBytes >= U32_MAX_LEN );
( void ) bufferSizeBytes;
while( value > 0U )
{
*pCur++ = asciiDigits[ ( value % 10U ) ];
value /= 10U;
}
while( pCur > workBuf )
{
pDest[ size++ ] = *--pCur;
}
pDest[ size++ ] = '\0';
return size;
}
static size_t stringBuilderUInt32Hex( char * pBuffer,
size_t bufferSizeBytes,
uint32_t value )
{
char workBuf[ U32_MAX_LEN ];
char * pCur = workBuf;
char * pDest = pBuffer;
size_t size = 0;
int i;
/* Assert if there is not enough buffer space. */
assert( bufferSizeBytes >= U32_MAX_LEN );
( void ) bufferSizeBytes;
/* Render all 8 digits, including leading zeros. */
for( i = 0; i < 8; i++ )
{
*pCur++ = asciiDigits[ value & 15U ]; /* 15U = 0xF*/
value >>= 4;
}
while( pCur > workBuf )
{
pDest[ size++ ] = *--pCur;
}
pDest[ size++ ] = '\0';
return size;
}
/*
* Subscribe to the OTA job notification topics.
*/
static OtaMqttStatus_t subscribeToJobNotificationTopics( const OtaAgentContext_t * pAgentCtx )
{
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
uint16_t topicLen = 0;
/* This buffer is used to store generated MQTT topics. The static size
* are calculated from the templates and the corresponding parameters. */
static char pJobTopicNotifyNext[ TOPIC_NOTIFY_NEXT_BUFFER_SIZE ];
/* NULL-terminated list of topic string components */
const char * topicStringParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below */
MQTT_API_JOBS_NOTIFY_NEXT,
NULL
};
assert( pAgentCtx != NULL );
topicStringParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
topicLen = ( uint16_t ) stringBuilder(
pJobTopicNotifyNext,
sizeof( pJobTopicNotifyNext ),
topicStringParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pJobTopicNotifyNext ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.subscribe( pJobTopicNotifyNext,
topicLen,
1 );
if( mqttStatus == OtaMqttSuccess )
{
LogInfo( ( "Subscribed to MQTT topic: %s", pJobTopicNotifyNext ) );
}
else
{
LogError( ( "Failed to subscribe to MQTT topic: "
"subscribe returned error: "
"OtaMqttStatus_t=%s"
", topic=%s",
OTA_MQTT_strerror( mqttStatus ),
pJobTopicNotifyNext ) );
}
return mqttStatus;
}
/*
* UnSubscribe from the OTA data stream topic.
*/
static OtaMqttStatus_t unsubscribeFromDataStream( const OtaAgentContext_t * pAgentCtx )
{
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. */
char pOtaRxStreamTopic[ TOPIC_STREAM_DATA_BUFFER_SIZE ];
uint16_t topicLen = 0;
const OtaFileContext_t * pFileContext = NULL;
/* NULL-terminated list of topic string parts. */
const char * topicStringParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_STREAMS,
NULL, /* Stream Name not available at compile time, initialized below. */
MQTT_API_DATA_CBOR,
NULL
};
assert( pAgentCtx != NULL );
pFileContext = &( pAgentCtx->fileContext );
topicStringParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
topicStringParts[ 3 ] = ( const char * ) pFileContext->pStreamName;
/* Try to build the dynamic data stream topic and unsubscribe from it. */
topicLen = ( uint16_t ) stringBuilder(
pOtaRxStreamTopic,
sizeof( pOtaRxStreamTopic ),
topicStringParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pOtaRxStreamTopic ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.unsubscribe( pOtaRxStreamTopic,
topicLen,
1 );
if( mqttStatus == OtaMqttSuccess )
{
LogInfo( ( "Unsubscribed to MQTT topic: %s", pOtaRxStreamTopic ) );
}
else
{
LogError( ( "Failed to unsubscribe to MQTT topic: "
"unsubscribe returned error: "
"OtaMqttStatus_t=%s"
", topic=%s",
OTA_MQTT_strerror( mqttStatus ),
pOtaRxStreamTopic ) );
}
return mqttStatus;
}
/*
* Unsubscribe from the OTA job notification topics.
*/
static OtaMqttStatus_t unsubscribeFromJobNotificationTopic( const OtaAgentContext_t * pAgentCtx )
{
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. This
* buffer is used with two separate templates and its size is set fit the
* larger of the two. */
char pJobTopic[ TOPIC_NOTIFY_NEXT_BUFFER_SIZE ];
uint16_t topicLen = 0;
/* NULL-terminated list of topic string parts. */
const char * topicStringParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_JOBS_NOTIFY_NEXT,
NULL
};
assert( pAgentCtx != NULL );
/* Try to unsubscribe from the first of two job topics. */
topicStringParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
topicLen = ( uint16_t ) stringBuilder(
pJobTopic,
sizeof( pJobTopic ),
topicStringParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pJobTopic ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.unsubscribe( pJobTopic,
topicLen,
0 );
if( mqttStatus == OtaMqttSuccess )
{
LogInfo( ( "Unsubscribed to MQTT topic: %s", pJobTopic ) );
}
else
{
LogError( ( "Failed to unsubscribe to MQTT topic: "
"unsubscribe returned error: "
"OtaMqttStatus_t=%s"
", topic=%s",
OTA_MQTT_strerror( mqttStatus ),
pJobTopic ) );
}
return mqttStatus;
}
/*
* Publish a message to the job status topic.
*/
static OtaMqttStatus_t publishStatusMessage( OtaAgentContext_t * pAgentCtx,
const char * pMsg,
uint32_t msgSize,
uint8_t qos )
{
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
size_t topicLen = 0;
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. */
char pTopicBuffer[ TOPIC_JOB_STATUS_BUFFER_SIZE ];
/* NULL-terminated list of topic string parts. */
const char * topicStringParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_JOBS,
NULL, /* Active Job Name not available at compile time, initialized below. */
MQTT_API_UPDATE,
NULL
};
assert( pAgentCtx != NULL );
/* pMsg is a static buffer of size "OTA_STATUS_MSG_MAX_SIZE". */
assert( pMsg != NULL );
/* Build the dynamic job status topic . */
topicStringParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
topicStringParts[ 3 ] = ( const char * ) pAgentCtx->pActiveJobName;
topicLen = stringBuilder(
pTopicBuffer,
sizeof( pTopicBuffer ),
topicStringParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pTopicBuffer ) ) );
/* Publish the status message. */
LogDebug( ( "Attempting to publish MQTT status message: "
"message=%s",
pMsg ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.publish( pTopicBuffer,
( uint16_t ) topicLen,
&pMsg[ 0 ],
msgSize,
qos );
if( mqttStatus == OtaMqttSuccess )
{
LogDebug( ( "Published to MQTT topic: "
"topic=%s",
pTopicBuffer ) );
}
else
{
LogError( ( "Failed to publish MQTT message: "
"publish returned error: "
"OtaMqttStatus_t=%s"
", topic=%s",
OTA_MQTT_strerror( mqttStatus ),
pTopicBuffer ) );
}
return mqttStatus;
}
static uint32_t buildStatusMessageReceiving( char * pMsgBuffer,
size_t msgBufferSize,
OtaJobStatus_t status,
const OtaFileContext_t * pOTAFileCtx )
{
char receivedString[ U32_MAX_LEN + 1 ];
char numBlocksString[ U32_MAX_LEN + 1 ];
uint32_t numBlocks = 0;
uint32_t received = 0;
uint32_t msgSize = 0;
/* NULL-terminated list of JSON payload components */
/* NOTE: this must conform to the following format, do not add spaces, etc. */
/* "\"%s\":\"%u/%u\"}}" */
const char * payloadStringParts[] =
{
NULL, /* Job status is not available at compile time, initialized below. */
pOtaStringReceive,
":\"",
NULL, /* Received string is not available at compile time, initialized below. */
"/",
NULL, /* # blocks string is not available at compile time, initialized below. */
"\"}}",
NULL
};
assert( pMsgBuffer != NULL );
/* This function is only called when a file is received, so it can't be NULL. */
assert( pOTAFileCtx != NULL );
numBlocks = ( pOTAFileCtx->fileSize + ( OTA_FILE_BLOCK_SIZE - 1U ) ) >> otaconfigLOG2_FILE_BLOCK_SIZE;
received = numBlocks - pOTAFileCtx->blocksRemaining;
/* Output a status update once in a while. */
if( ( received % otaconfigOTA_UPDATE_STATUS_FREQUENCY ) == 0U )
{
payloadStringParts[ 0 ] = pOtaJobStatusStrings[ status ];
payloadStringParts[ 3 ] = receivedString;
payloadStringParts[ 5 ] = numBlocksString;
( void ) stringBuilderUInt32Decimal( receivedString, sizeof( receivedString ), received );
( void ) stringBuilderUInt32Decimal( numBlocksString, sizeof( numBlocksString ), numBlocks );
msgSize = ( uint32_t ) stringBuilder(
pMsgBuffer,
msgBufferSize,
payloadStringParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( msgSize > 0U ) && ( msgSize < msgBufferSize ) );
}
return msgSize;
}
static uint32_t prvBuildStatusMessageSelfTest( char * pMsgBuffer,
size_t msgBufferSize,
OtaJobStatus_t status,
int32_t reason )
{
uint32_t msgSize = 0;
/* NULL-terminated list of JSON payload components */
/* NOTE: this must agree with the following format, do not add spaces, etc. */
/* "\"%s\":\"%s\",\"" OTA_JSON_UPDATED_BY_KEY_ONLY "\":\"0x%x\"}}" */
char versionString[ U32_MAX_LEN + 1 ];
const char * pPayloadStringParts[] =
{
NULL, /* Job status string not available at compile time, initialized below. */
"\"",
OTA_JSON_SELF_TEST_KEY_ONLY,
"\":\"",
NULL, /* Job reason string not available at compile time, initialized below. */
"\",\"" OTA_JSON_UPDATED_BY_KEY_ONLY "\":\"0x",
NULL, /* Version string not available at compile time, initialized below. */
"\"}}",
NULL
};
assert( pMsgBuffer != NULL );
( void ) stringBuilderUInt32Hex( versionString, sizeof( versionString ), appFirmwareVersion.u.unsignedVersion32 );
pPayloadStringParts[ 0 ] = pOtaJobStatusStrings[ status ];
pPayloadStringParts[ 4 ] = pOtaJobReasonStrings[ reason ];
pPayloadStringParts[ 6 ] = versionString;
msgSize = ( uint32_t ) stringBuilder(
pMsgBuffer,
msgBufferSize,
pPayloadStringParts );
assert( ( msgSize > 0U ) && ( msgSize < msgBufferSize ) );
LogDebug( ( "Created self test update: %s", pMsgBuffer ) );
return msgSize;
}
static uint32_t prvBuildStatusMessageFinish( char * pMsgBuffer,
size_t msgBufferSize,
OtaJobStatus_t status,
int32_t reason,
int32_t subReason,
uint32_t previousVersion )
{
uint32_t msgSize = 0;
char reasonString[ U32_MAX_LEN + 1 ];
char subReasonString[ U32_MAX_LEN + 1 ];
char newVersionMajorString[ U32_MAX_LEN + 1 ];
char newVersionMinorString[ U32_MAX_LEN + 1 ];
char newVersionBuildString[ U32_MAX_LEN + 1 ];
char prevVersionMajorString[ U32_MAX_LEN + 1 ];
char prevVersionMinorString[ U32_MAX_LEN + 1 ];
char prevVersionBuildString[ U32_MAX_LEN + 1 ];
AppVersion32_t newVersion = { 0 }, prevVersion = { 0 };
/* NULL-terminated list of payload string parts */
/* NOTE: this must conform to the following format, do not add spaces, etc. */
/* "\"reason\":\"0x%08x: 0x%08x\"}}" */
const char * pPayloadPartsStatusFailedWithValue[] =
{
NULL, /* Job status string not available at compile time, initialized below. */
"\"reason\":\"0x",
NULL, /* Reason string not available at compile time, initialized below. */
": 0x",
NULL, /* Sub-Reason string not available at compile time, initialized below. */
"\"}}",
NULL
};
/* NULL-terminated list of payload string parts */
/* NOTE: this must agree with the following format, do not add spaces, etc. */
/* "\"reason\":\"%s v%u.%u.%u\"}}" */
const char * pPayloadPartsStatusSucceeded[] =
{
NULL, /* Job status string not available at compile time, initialized below. */
"\"reason\":\"",
NULL, /* Reason string not available at compile time, initialized below. */
" v",
NULL, /* Version major string not available at compile time, initialized below. */
".",
NULL, /* Version minor string not available at compile time, initialized below. */
".",
NULL, /* Version build string not available at compile time, initialized below. */
"\",\"" OTA_JSON_UPDATED_BY_KEY_ONLY "\":\"", /* Expands to `","updatedBy":` */
" v",
NULL, /* Previous version major string not available at compile time, initialized below. */
".",
NULL, /* Previous version minor string not available at compile time, initialized below. */
".",
NULL, /* Previous version build string not available at compile time, initialized below. */
"\"}}",
NULL
};
/* NULL-terminated list of payload string parts */
/* NOTE: this must agree with the following format, do not add spaces, etc. */
/* "\"reason\":\"%s: 0x%08x\"}}" */
const char * pPayloadPartsStatusOther[] =
{
NULL, /* Job status string not available at compile time, initialized below. */
"\"reason\":\"",
NULL, /* Reason string not available at compile time, initialized below. */
": 0x",
NULL, /* Sub-Reason string not available at compile time, initialized below. */
"\"}}",
NULL
};
const char ** pPayloadParts;
assert( pMsgBuffer != NULL );
newVersion.u.signedVersion32 = subReason;
prevVersion.u.signedVersion32 = ( int32_t ) previousVersion;
( void ) stringBuilderUInt32Hex( reasonString, sizeof( reasonString ), ( uint32_t ) reason );
( void ) stringBuilderUInt32Hex( subReasonString, sizeof( subReasonString ), ( uint32_t ) subReason );
/* FailedWithVal uses a numeric OTA error code and sub-reason code to cover
* the case where there may be too many description strings to reasonably
* include in the code.
*/
if( status == JobStatusFailedWithVal )
{
pPayloadParts = pPayloadPartsStatusFailedWithValue;
pPayloadParts[ 0 ] = pOtaJobStatusStrings[ status ];
pPayloadParts[ 2 ] = reasonString;
pPayloadParts[ 4 ] = subReasonString;
}
/* If the status update is for "Succeeded," we are identifying the version
* of firmware that has been accepted. This makes it easy to find the
* version associated with each device (Thing) when examining the OTA jobs
* on the service side via the CLI or possibly with some console tool.
*/
else if( status == JobStatusSucceeded )
{
/* New version string.*/
( void ) stringBuilderUInt32Decimal( newVersionMajorString, sizeof( newVersionMajorString ), newVersion.u.x.major );
( void ) stringBuilderUInt32Decimal( newVersionMinorString, sizeof( newVersionMinorString ), newVersion.u.x.minor );
( void ) stringBuilderUInt32Decimal( newVersionBuildString, sizeof( newVersionBuildString ), newVersion.u.x.build );
/* Updater version string.*/
( void ) stringBuilderUInt32Decimal( prevVersionMajorString, sizeof( prevVersionMajorString ), prevVersion.u.x.major );
( void ) stringBuilderUInt32Decimal( prevVersionMinorString, sizeof( prevVersionMinorString ), prevVersion.u.x.minor );
( void ) stringBuilderUInt32Decimal( prevVersionBuildString, sizeof( prevVersionMinorString ), prevVersion.u.x.build );
pPayloadParts = pPayloadPartsStatusSucceeded;
pPayloadParts[ 0 ] = pOtaJobStatusStrings[ status ];
pPayloadParts[ 2 ] = pOtaJobReasonStrings[ reason ];
pPayloadParts[ 4 ] = newVersionMajorString;
pPayloadParts[ 6 ] = newVersionMinorString;
pPayloadParts[ 8 ] = newVersionBuildString;
pPayloadParts[ 11 ] = prevVersionMajorString;
pPayloadParts[ 13 ] = prevVersionMinorString;
pPayloadParts[ 15 ] = prevVersionBuildString;
}
else
{
pPayloadParts = pPayloadPartsStatusOther;
pPayloadParts[ 0 ] = pOtaJobStatusStrings[ status ];
pPayloadParts[ 2 ] = pOtaJobReasonStrings[ reason ];
pPayloadParts[ 4 ] = subReasonString;
}
msgSize = ( uint32_t ) stringBuilder(
pMsgBuffer,
msgBufferSize,
pPayloadParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( msgSize > 0U ) && ( msgSize < msgBufferSize ) );
return msgSize;
}
/*
* Check for next available OTA job from the job service by publishing
* a "get next job" message to the job service.
*/
OtaErr_t requestJob_Mqtt( OtaAgentContext_t * pAgentCtx )
{
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. */
char pJobTopic[ TOPIC_GET_NEXT_BUFFER_SIZE ];
/* The following buffer is big enough to hold a dynamically constructed
* $next/get job message. It contains a client token that is used to track
* how many requests have been made. */
char pMsg[ MSG_GET_NEXT_BUFFER_SIZE ];
static uint32_t reqCounter = 0;
OtaErr_t otaError = OtaErrRequestJobFailed;
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
uint32_t msgSize = 0;
uint16_t topicLen = 0;
/* NULL-terminated list of topic string parts. */
const char * pTopicParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_JOBS_NEXT_GET,
NULL
};
char reqCounterString[ U32_MAX_LEN + 1 ];
/* NULL-terminated list of payload parts */
/* NOTE: this must agree with pOtaGetNextJobMsgTemplate, do not add spaces, etc. */
const char * pPayloadParts[] =
{
"{\"clientToken\":\"",
NULL, /* Request counter string not available at compile time, initialized below. */
":",
NULL, /* Thing Name not available at compile time, initialized below. */
"\"}",
NULL
};
assert( pAgentCtx != NULL );
pTopicParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
pPayloadParts[ 1 ] = reqCounterString;
pPayloadParts[ 3 ] = ( const char * ) pAgentCtx->pThingName;
( void ) stringBuilderUInt32Decimal( reqCounterString, sizeof( reqCounterString ), reqCounter );
/* Subscribe to the OTA job notification topic. */
mqttStatus = subscribeToJobNotificationTopics( pAgentCtx );
if( mqttStatus == OtaMqttSuccess )
{
LogDebug( ( "MQTT job request number: counter=%u", reqCounter ) );
msgSize = ( uint32_t ) stringBuilder(
pMsg,
sizeof( pMsg ),
pPayloadParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( msgSize > 0U ) && ( msgSize < sizeof( pMsg ) ) );
reqCounter++;
topicLen = ( uint16_t ) stringBuilder(
pJobTopic,
sizeof( pJobTopic ),
pTopicParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pJobTopic ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.publish( pJobTopic, topicLen, pMsg, msgSize, 1 );
if( mqttStatus == OtaMqttSuccess )
{
LogDebug( ( "Published MQTT request to get the next job: "
"topic=%s",
pJobTopic ) );
otaError = OtaErrNone;
}
else
{
LogError( ( "Failed to publish MQTT message:"
"publish returned error: "
"OtaMqttStatus_t=%s",
OTA_MQTT_strerror( mqttStatus ) ) );
}
}
return otaError;
}
/*
* Update the job status on the service side with progress or completion info.
*/
OtaErr_t updateJobStatus_Mqtt( OtaAgentContext_t * pAgentCtx,
OtaJobStatus_t status,
int32_t reason,
int32_t subReason )
{
OtaErr_t result = OtaErrNone;
OtaMqttStatus_t mqttStatus = OtaMqttPublishFailed;
/* A message size of zero means don't publish anything. */
uint32_t msgSize = 0U;
/* All job state transitions except streaming progress use QOS 1 since it is required to have status in the job document. */
char pMsg[ OTA_STATUS_MSG_MAX_SIZE ];
uint8_t qos = 1;
const OtaFileContext_t * pFileContext = NULL;
assert( pAgentCtx != NULL );
/* Get the current file context. */
pFileContext = &( pAgentCtx->fileContext );
if( status == JobStatusInProgress )
{
if( reason == ( int32_t ) JobReasonReceiving )
{
msgSize = buildStatusMessageReceiving( pMsg, sizeof( pMsg ), status, pFileContext );
/* Downgrade Progress updates to QOS 0 to avoid overloading MQTT buffers during active streaming. */
qos = 0;
}
else
{
/* We're no longer receiving but we're still In Progress so we are implicitly in the Self
* Test phase. Prepare to update the job status with the self_test phase (ready or active). */
msgSize = prvBuildStatusMessageSelfTest( pMsg, sizeof( pMsg ), status, reason );
}
}
else
{
/* The potential values for status are constant at compile time. */
assert( status < NumJobStatusMappings );
msgSize = prvBuildStatusMessageFinish( pMsg, sizeof( pMsg ), status, reason, subReason, pAgentCtx->fileContext.updaterVersion );
}
if( msgSize > 0U )
{
/* Publish the string created above. */
mqttStatus = publishStatusMessage( pAgentCtx, pMsg, msgSize, qos );
if( mqttStatus == OtaMqttSuccess )
{
LogDebug( ( "Published update to the job status." ) );
}
else
{
LogError( ( "Failed to publish MQTT status message: "
"publishStatusMessage returned error: "
"OtaMqttStatus_t=%s",
OTA_MQTT_strerror( mqttStatus ) ) );
result = OtaErrUpdateJobStatusFailed;
}
}
return result;
}
/*
* Init file transfer by subscribing to the OTA data stream topic.
*/
OtaErr_t initFileTransfer_Mqtt( OtaAgentContext_t * pAgentCtx )
{
OtaErr_t result = OtaErrInitFileTransferFailed;
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. */
static char pRxStreamTopic[ TOPIC_STREAM_DATA_BUFFER_SIZE ]; /*!< Buffer to store the topic generated for requesting data stream. */
uint16_t topicLen = 0;
const OtaFileContext_t * pFileContext = NULL;
/* NULL-terminated list of topic string parts. */
const char * pTopicParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_STREAMS,
NULL, /* Stream Name not available at compile time, initialized below. */
MQTT_API_DATA_CBOR,
NULL
};
assert( pAgentCtx != NULL );
pFileContext = &( pAgentCtx->fileContext );
pTopicParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
pTopicParts[ 3 ] = ( const char * ) pFileContext->pStreamName;
topicLen = ( uint16_t ) stringBuilder(
pRxStreamTopic,
sizeof( pRxStreamTopic ),
pTopicParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pRxStreamTopic ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.subscribe( pRxStreamTopic,
topicLen,
0 );
if( mqttStatus == OtaMqttSuccess )
{
LogDebug( ( "Subscribed to the OTA data stream topic: "
"topic=%s",
pRxStreamTopic ) );
result = OtaErrNone;
}
else
{
LogError( ( "Failed to subscribe to MQTT topic: "
"subscribe returned error: "
"OtaMqttStatus_t=%s"
", topic=%s",
OTA_MQTT_strerror( mqttStatus ),
pRxStreamTopic ) );
}
return result;
}
/*
* Request file block by publishing to the get stream topic.
*/
OtaErr_t requestFileBlock_Mqtt( OtaAgentContext_t * pAgentCtx )
{
OtaErr_t result = OtaErrRequestFileBlockFailed;
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
size_t msgSizeFromStream = 0;
uint32_t blockSize = OTA_FILE_BLOCK_SIZE;
uint32_t numBlocks = 0;
uint32_t bitmapLen = 0;
uint32_t msgSizeToPublish = 0;
uint32_t topicLen = 0;
bool cborEncodeRet = false;
char pMsg[ OTA_REQUEST_MSG_MAX_SIZE ];
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. */
char pTopicBuffer[ TOPIC_GET_STREAM_BUFFER_SIZE ];
const OtaFileContext_t * pFileContext = NULL;
/* NULL-terminated list of topic string parts. */
const char * pTopicParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_STREAMS,
NULL, /* Stream Name not available at compile time, initialized below. */
MQTT_API_GET_CBOR,
NULL
};
assert( pAgentCtx != NULL );
/* Get the current file context. */
pFileContext = &( pAgentCtx->fileContext );
pTopicParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
pTopicParts[ 3 ] = ( const char * ) pFileContext->pStreamName;
/* Reset number of blocks requested. */
pAgentCtx->numOfBlocksToReceive = otaconfigMAX_NUM_BLOCKS_REQUEST;
numBlocks = ( pFileContext->fileSize + ( OTA_FILE_BLOCK_SIZE - 1U ) ) >> otaconfigLOG2_FILE_BLOCK_SIZE;
bitmapLen = ( numBlocks + ( BITS_PER_BYTE - 1U ) ) >> LOG2_BITS_PER_BYTE;
cborEncodeRet = OTA_CBOR_Encode_GetStreamRequestMessage( ( uint8_t * ) pMsg,
sizeof( pMsg ),
&msgSizeFromStream,
OTA_CLIENT_TOKEN,
( int32_t ) pFileContext->serverFileID,
( int32_t ) blockSize,
0,
pFileContext->pRxBlockBitmap,
bitmapLen,
( int32_t ) otaconfigMAX_NUM_BLOCKS_REQUEST );
if( cborEncodeRet == true )
{
msgSizeToPublish = ( uint32_t ) msgSizeFromStream;
/* Try to build the dynamic data REQUEST topic to publish to. */
topicLen = ( uint32_t ) stringBuilder(
pTopicBuffer,
sizeof( pTopicBuffer ),
pTopicParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pTopicBuffer ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.publish( pTopicBuffer,
( uint16_t ) topicLen,
&pMsg[ 0 ],
msgSizeToPublish,
0 );
if( mqttStatus == OtaMqttSuccess )
{
LogInfo( ( "Published to MQTT topic to request the next block: "
"topic=%s",
pTopicBuffer ) );
result = OtaErrNone;
}
else
{
LogError( ( "Failed to publish MQTT message: "
"publish returned error: "
"OtaMqttStatus_t=%s",
OTA_MQTT_strerror( mqttStatus ) ) );
}
}
else
{
result = OtaErrFailedToEncodeCbor;
LogError( ( "Failed to CBOR encode stream request message: "
"OTA_CBOR_Encode_GetStreamRequestMessage returned error." ) );
}
return result;
}
/*
* Decode a cbor encoded fileblock received from streaming service.
*/
OtaErr_t decodeFileBlock_Mqtt( const uint8_t * pMessageBuffer,
size_t messageSize,
int32_t * pFileId,
int32_t * pBlockId,
int32_t * pBlockSize,
uint8_t ** pPayload,
size_t * pPayloadSize )
{
OtaErr_t result = OtaErrFailedToDecodeCbor;
bool cborDecodeRet = false;
/* Decode the CBOR content. */
cborDecodeRet = OTA_CBOR_Decode_GetStreamResponseMessage( pMessageBuffer,
messageSize,
pFileId,
pBlockId, /* CBOR requires pointer to int and our block indices never exceed 31 bits. */
pBlockSize, /* CBOR requires pointer to int and our block sizes never exceed 31 bits. */
pPayload, /* This payload gets malloc'd by OTA_CBOR_Decode_GetStreamResponseMessage(). We must free it. */
pPayloadSize );
if( cborDecodeRet == true )
{
/* pPayload and pPayloadSize is allocated by the caller. */
assert( ( pPayload != NULL ) && ( pPayloadSize != NULL ) );
result = OtaErrNone;
}
else
{
LogError( ( "Failed to decode MQTT file block: "
"OTA_CBOR_Decode_GetStreamResponseMessage returned error." ) );
}
return result;
}
/*
* Perform any cleanup operations required for control plane.
*/
OtaErr_t cleanupControl_Mqtt( const OtaAgentContext_t * pAgentCtx )
{
OtaErr_t result = OtaErrNone;
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
assert( pAgentCtx != NULL );
if( pAgentCtx->unsubscribeOnShutdown != 0U )
{
/* Unsubscribe from job notification topics. */
mqttStatus = unsubscribeFromJobNotificationTopic( pAgentCtx );
if( mqttStatus != OtaMqttSuccess )
{
LogWarn( ( "Failed cleanup for MQTT control plane: "
"unsubscribeFromJobNotificationTopic returned error: "
"OtaMqttStatus_t=%s",
OTA_MQTT_strerror( mqttStatus ) ) );
result = OtaErrCleanupControlFailed;
}
}
return result;
}
/*
* Perform any cleanup operations required for data plane.
*/
OtaErr_t cleanupData_Mqtt( const OtaAgentContext_t * pAgentCtx )
{
OtaErr_t result = OtaErrNone;
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
assert( pAgentCtx != NULL );
if( pAgentCtx->unsubscribeOnShutdown != 0U )
{
/* Unsubscribe from data stream topics. */
mqttStatus = unsubscribeFromDataStream( pAgentCtx );
if( mqttStatus != OtaMqttSuccess )
{
LogWarn( ( "Failed cleanup for MQTT data plane: "
"unsubscribeFromDataStream returned error: "
"OtaMqttStatus_t=%s",
OTA_MQTT_strerror( mqttStatus ) ) );
result = OtaErrCleanupDataFailed;
}
}
return result;
}
const char * OTA_MQTT_strerror( OtaMqttStatus_t status )
{
const char * str = NULL;
switch( status )
{
case OtaMqttSuccess:
str = "OtaMqttSuccess";
break;
case OtaMqttPublishFailed:
str = "OtaMqttPublishFailed";
break;
case OtaMqttSubscribeFailed:
str = "OtaMqttSubscribeFailed";
break;
case OtaMqttUnsubscribeFailed:
str = "OtaMqttUnsubscribeFailed";
break;
default:
str = "InvalidErrorCode";
break;
}
return str;
}