Common (node.js + browser)
These classes and functions are available via both @transitive-sdk/utils and @transitive-sdk/utils-web.
DataCache
A class implementing a local data cache, used as a local data store with deduplication detection and update events. While this class is very handy you probably won't need to create instances of it directly. Instead use the mqttSync.data instance which holds the locally stored data subscribed/published from/to MQTTSync. For example on the robot:
// update/publish our status:
mqttSync.data.update('status', {changed: Date.now(), msg: 'OK'});
// subscribe to new user requests (e.g., from UI):
mqttSync.data.subscribePath('+user/request', (request, key, {user}) => {
log.debug(`user ${user} made request`, request);
});
In the cloud or in a web component you would need to use the full topic including org, device, scope, cap-name, and version.
Parameters
data(optional, default{})
filter
Filter the object using path with wildcards
Parameters
path
filterByTopic
Filter the object using topic with wildcards
Parameters
topic
forMatch
For each topic match, invoke the callback with the value, path, and match just like subscribePath, but on the current data rather than future changes.
Parameters
topiccallback
forPathMatch
For each path match, invoke the callback with the value, path, and match just like subscribePath
Parameters
pathcallback
get
Get sub-value at path, or entire object if none given
Parameters
path(optional, default[])
getByTopic
Get sub-value specified by topic
Parameters
topic
subscribe
Add a callback for all change events.
Parameters
callback
subscribePath
Subscribe to a specific path (array) only. Callback receives
value, key, matched, tags.
Parameters
pathcallback
subscribePathFlat
Same as subscribePath but always get all changes in flat form
Parameters
topiccallback
subscribeTopic
Subscribe to a specific topic only. Callback receives
value, key, matched, tags.
Parameters
topiccallback
unsubscribe
Remove a callback previously registered using subscribe.
Parameters
callback
update
Update the value at the given path (array or dot separated string)
Parameters
pathvaluetags
updateFromArray
Update the object with the given value at the given path, remove empty;
return the flat changes (see toFlatObject). Add tags to updates to mark
them somehow based on the context, e.g., so that some subscriptions can choose
to ignore updates with a certain tag.
Parameters
pathvaluetags(optional, default{})
updateFromModifier
Update data from a modifier object where keys are topic names to be interpreted as paths, and values are the values to set
Parameters
modifiertags
updateFromTopic
Set value from the given topic (with or without leading or trailing slash)
Parameters
topicvaluetags
MqttSync
A class that combines DataCache and MQTT to implement a data synchronization feature over the latter. Relies on retained messages in mqtt for persistence.
Parameters
-
optionsobjectoptions.mqttClientobject An already connected mqtt.js client.options.onChangefunction? A function that is called any time there is a change to the shared data. This is not usually used. It's usually better to use the finer grainedMqttSync.data.subscribePathinstead, that allows you to subscribe to changes just on a specific sun-object instead, see DataCache.options.ignoreRetainboolean? retain all messages, ignorant of the retain flag.options.migratearray? an array of objects of the form{topic, newVersion, level}. Only meaningful in the cloud. Instructs MQTTSync to first migrate existing topics to a new version namespace, publishing at the designated level down from the version level. For example:js [{ topic: `/myorg/mydevice/@local/my-cap/+/config`, newVersion: this.version, level: 1 }]Would migrate any existing data in the capability'sconfignamespace to the current version of the package, publishing at theconfig/+level (rather than atomically at the config level itself).options.onReadyfunction? A function that is called when the MQTTSync client is ready and has completed any requested migrations.options.sliceTopicnumber? a number indicating at what level to slice the topic, i.e., only use a suffix. Used in robot-capabilities to slice off the topic prefix (namespaces).options.onHeartbeatGrantedoptions.inclMeta
beforeDisconnect
Run all registered hooks before disconnecting
call
Make an RPC request. Example:
mqttSync.call('/orgId/deviceId/@capScope/capName/capVersion/mySquare', 11, result => {
log.debug(`Called /mySquare with arg 11 and got ${result}`);
});
This would call the RPC 'mySquare' registered by version capVersion of the
capability @capScope/capName running on device deviceId by user orgId.
That RPC would have been registered on the device using
mqttSync.register('/mySquare', ...) as shown above (because namespaces
on the device are auto-extended to the org, device and capability).
RPCs are typically registered on the robot and called from the web or cloud but the inverse is also possible, with the same namespace caveat.
Alternative you can omit the callback and use async/await:
const result = await mqttSync.call('${prefix}/mySquare', 11);
log.debug(`Called /mySquare with arg 11 and got ${result}`);
See the note about namespaces in register.
Note: It is your responsibility to only call methods that exist (have been registered). Calling a non-existent command just hangs.
Parameters
commandargscallback(optional, defaultundefined)
clear
Delete all retained messages in a certain topic prefix, waiting for a mqtt broker heartbeat to collect existing retained. Use with care, never delete topics not owned by us. Harmless within capabilities, which are namespaced already.
options.filter(topic): a function that can be provided to further,
programmatically filter the set of topics to clear, e.g., to onlt clear
topics of old versions.
Note: This may not yet work in robot-capabilities, since the subscription prefix and received topic prefix don't match (the device prefix is added to subscription by localMQTT.
Parameters
prefixescallback(optional, defaultundefined)options(optional, default{})
clearThrottle
Clear the set throttling delay.
getRPCHandler
Given a (ground) topic find the matching RPC handler, if any. This is needed because RPC topics can include wildcards.
Parameters
topic
migrate
Migrate a list of {topic, newVersion, transform}. The version number in
topic will be ignored, and all versions' values will be merged, applied in
order, such that the latest version is applied last. topic may include
wildcards in the part before the version number but not after.
Example:
mqttSync.migrate([{topic: '/+/dId/@scope/capname/+/b', newVersion: '1.2.0'}]
Parameters
listonReady(optional, defaultundefined)
onBeforeDisconnect
Register a new hook to be called before disconnecting
Parameters
fn
publish
Register a listener for path in data. Make sure to populate the data before calling this or set the data all at once afterwards.
With option "atomic" this will always send the whole sub-document, not flat changes. Useful, e.g., for desiredPackages, see https://github.com/chfritz/transitive/issues/85.
Parameters
topicoptions(optional, default{atomic:false})
Returns any true if publication added (false, e.g., when already present)
publishAtLevel
Publish all values at the given level of the given object under the given topic (plus sub-key, of course).
Parameters
topicvaluelevel
queryHistory
Query a topics history (if stored). Convenience function to make RPC call
to the mqtt2clickhouse service. For details see clickhouse.queryMQTTHistory
in utils/clickhouse.
Parameters
-
paramsobjectparams.topicSelectorobject A topic with wildcards selecting what to retrieve.params.sincenumber? A time (seconds since epoch) from when on to retrieve history.params.untilnumber? A time (seconds since epoch) until when on to retrieve history.params.path[string]? A path into the payload to extract, e.g.,['a', 'b']would retrieve the value 123 from{a: {b: 123}}. Requirestype.params.typestring? Type of element to extract usingpath. For available types, see https://clickhouse.com/docs/sql-reference/data-types.params.orderBystring? anORDER BYstatement to use for sorting results.params.limitinteger? Max number of results to return, after grouping.params.binsinteger? Into how many bins to aggregate (if given, requiressince).params.aggstring? Aggregation function to use (ifaggSecondsorbinsandsinceare given). Defaults tocount(which works for any data type). See https://clickhouse.com/docs/sql-reference/aggregate-functions/reference.params.aggSecondsinteger? How many seconds to group together (alternative tobins+since).
register
Register an RPC request handler. Example:
mqttSync.register('/mySquare', (arg, commandTopic) => {
log.debug('we got request on topic', commandTopic);
log.debug('running /mySquare with args', arg);
return arg * arg;
});
Note that the command topic needs to be in the capabilities namespace like
any other topic. In robot capabilities, as usual, these can start in /
because the local mqtt bridge operated by the robot agent will place all
topics in their respective namespace. In the cloud and on the web you will
need to use the respective namespace, i.e.,
/orgId/deviceId/@scope/capName/capVersion/.
You can use wildcards in the registered topic. The handler will receive the actual, ground topic the request was made on as the second argument. This allows you to make the RPCs behavior depend on the topic.
Async/Await
Yes, you can make the handler async and use await inside of it. This
will be handled correctly, i.e., MqttSync will await the result of the
handler before responding to the RPC request client.
Parameters
commandhandler
requestHistoryStorage
Request the history of the described topics (selector with wildcards) to
be stored in ClickHouse for the ttl number of days (if the mqtt2clickhouse
service is running -- as it usually is inside the
transitiverobotics/clickhouse docker image).
Parameters
topicttl(optional, default1)
setThrottle
Set delay between processing of publishing queue in milliseconds. This allows you to effectively throttle the rate at which this instance will publish changes. Note that updates to a topic already in the queue will not cause multiple publications. Only the latest value will be published.
Parameters
delaynumber? Number of milliseconds to wait between processing of publish queue.
subscribe
Subscribe to the given topic (and all sub-topics). The callback will indicate success/failure, not a message on the topic.
Parameters
topiccallback(optional, defaultnoop)
waitForHeartbeatOnce
Register a callback for the next heartbeat from the broker
Parameters
callback
clone
Deep-clone the given object. All functionality is lost, just data is kept.
Parameters
obj
decodeJWT
Parse JWT and return the decoded payload (JSON).
Parameters
jwt
dropWildcardIds
reduce wildcards with Ids, such as +sessionId, to just +
Parameters
x
forMatchIterator
Iterate through the object and invoke callback for each match of path (with named wildcards)
Parameters
objpathcallbackpathSoFar(optional, default[])matchSoFar(optional, default{})
getDateBase52
Get a base52 representation [a-zA-Z] of the current date (ms since epoch)
getLogger
Get a new loglevel logger; call with a name, e.g., module.id. The returned
logger has methods trace, debug, info, warn, error. See
https://www.npmjs.com/package/loglevel for details.
getRandomId
Generate a random id (base36)
Parameters
bytes(optional, default6)
isPrefixOf
prefixArray is a prefix of the array
Parameters
prefixArrayarray
isSubTopicOf
sub is a strict sub-topic of parent, and in particular not equal
Parameters
subparent
mergeVersions
given an object where the keys are versions, merge this into one object where the latest version of each subfield overwrites any previous
Parameters
versionsObjectsubTopic(optional, defaultundefined)options(optional, default{})
metaPathToSelectorPath
Given a storage request topic, replace the meta-data fields into wildcards
Parameters
path
metaPathToSelectorPath
Given a meta path, return path with encoded meta fields turned into wildcards
Parameters
path
metaTopicToSelector
Given a meta topic, return topic with encoded meta fields turned into wildcards
Parameters
topic
mqttClearRetained
delete all retained messages in a certain topic prefix, waiting for a given delay to collect existing retained. Use with care, never delete topics not owned by us. Harmless within capabilities, which are namespaced already.
Parameters
mqttClientprefixescallbackdelay(optional, default1000)
parseMQTTTopic
parse an MQTT topic according to our topic schema
Parameters
topic
parseMQTTUsername
parse usernames used in MQTT
Parameters
username
pathToTopic
convert a path array to mqtt topic; reduces +id identifiers to +
Parameters
pathArray
selectFromObject
Given an object and a path with wildcards (* and +), modify the object
to only contain elements matched by the path, e.g.,
{a: {b: 1, c: 2}, d: 2} and ['a','+'] would give {a: {b: 1, c: 2}}
Parameters
objobject The object to select frompatharray An array specifying the path to select, potentially containing mqtt wildcards ('+').
selectorPathToMetaPath
Given a selector path with wildcards, return path with wildcards turned into encoded meta fields (inverse of metaTopicToSelector)
Parameters
path
selectorToMetaTopic
Given a selector topic with wildcards, return topic with wildcards turned into encoded meta fields (inverse of metaTopicToSelector)
Parameters
topic
setFromPath
Like _.set but without arrays. This allows using numbers as keys.
Parameters
objpathvalue
toBase52
Convert number to base52 [a-zA-Z]
Parameters
num
toFlatObject
Given an object, return a new flat object of topic+value pairs, e.g.:
{a: {b: 1, c: 2}, d: 3} → {'/a/b': 1, '/a/c': 2, '/d': 3}
Note: not idempotent!
{'/a/b': 1, '/a/c': 2, d: 3} → {'%2Fa%2Fb': 1, '%2Fa%2Fc': 2, '/d': 3}
Parameters
objprefix(optional, default[])rtv(optional, default{})
topicMatch
Match a slash-separated topic or path array with a selector using +XYZ for (named) wildcards. Return the matching result.
Parameters
selectortopic
topicToPath
convert topic to path array
Parameters
topic
tryJSONParse
Try parsing JSON, return null if unsuccessful
Parameters
string
unset
Unset the topic in that obj, and clean up parent if empty, recursively. Return the path to the removed node.
Parameters
objpath
updateObject
Given a modifier {"a/b/c": "xyz"} update the object obj such that
obj.a.b.c = "xyz".
Parameters
objmodifier
versionCompare
Compare to version strings. Return -1 if a is lower than b, 0 if they are equal, and 1 otherwise. If either is not a complete version, e.g., 2.0, interpret it as a range and use its minimum version for the comparison. Hence, 2.0 < 2.0.1.
Parameters
ab
visit
Reusable visitor pattern: iteratively visits all nodes in the tree
described by object, where childField indicates the child-of predicate.
Parameters
objectchildFieldvisitor
visitAncestor
Given an object and a path, visit each ancestor of the path
Parameters
objectpathvisitorprefix(optional, default[])
wait
Wait for delay ms, for use in async functions.
Parameters
delay