xmpp.chapril.org-conversejs/src/headless/converse-smacks.js
JC Brand 830e060568 Rename api.settings.update to api.settings.extend
This is to try and make it clearer that this method won't override
initialization settings, and is instead simply to add to the default
settings.
2020-06-03 09:32:16 +02:00

255 lines
10 KiB
JavaScript

/**
* @module converse-smacks
* @copyright The Converse.js contributors
* @license Mozilla Public License (MPLv2)
* @description Converse.js plugin which adds support for XEP-0198: Stream Management
*/
import { _converse, api, converse } from "./converse-core";
import log from "./log";
const { Strophe } = converse.env;
const u = converse.env.utils;
Strophe.addNamespace('SM', 'urn:xmpp:sm:3');
converse.plugins.add('converse-smacks', {
initialize () {
// Configuration values for this plugin
// ====================================
// Refer to docs/source/configuration.rst for explanations of these
// configuration settings.
api.settings.extend({
'enable_smacks': true,
'smacks_max_unacked_stanzas': 5,
});
function isStreamManagementSupported () {
if (api.connection.isType('bosh') && !_converse.isTestEnv()) {
return false;
}
return api.disco.stream.getFeature('sm', Strophe.NS.SM);
}
function handleAck (el) {
if (!_converse.session.get('smacks_enabled')) {
return true;
}
const handled = parseInt(el.getAttribute('h'), 10);
const last_known_handled = _converse.session.get('num_stanzas_handled_by_server');
const delta = handled - last_known_handled;
if (delta < 0) {
const err_msg = `New reported stanza count lower than previous. `+
`New: ${handled} - Previous: ${last_known_handled}`
log.error(err_msg);
}
const unacked_stanzas = _converse.session.get('unacked_stanzas');
if (delta > unacked_stanzas.length) {
const err_msg =
`Higher reported acknowledge count than unacknowledged stanzas. `+
`Reported Acknowledged Count: ${delta} -`+
`Unacknowledged Stanza Count: ${unacked_stanzas.length} -`+
`New: ${handled} - Previous: ${last_known_handled}`
log.error(err_msg);
}
_converse.session.save({
'num_stanzas_handled_by_server': handled,
'num_stanzas_since_last_ack': 0,
'unacked_stanzas': unacked_stanzas.slice(delta)
});
return true;
}
function sendAck() {
if (_converse.session.get('smacks_enabled')) {
const h = _converse.session.get('num_stanzas_handled');
const stanza = u.toStanza(`<a xmlns="${Strophe.NS.SM}" h="${h}"/>`);
api.send(stanza);
}
return true;
}
function stanzaHandler (el) {
if (_converse.session.get('smacks_enabled')) {
if (u.isTagEqual(el, 'iq') || u.isTagEqual(el, 'presence') || u.isTagEqual(el, 'message')) {
const h = _converse.session.get('num_stanzas_handled');
_converse.session.save('num_stanzas_handled', h+1);
}
}
return true;
}
function initSessionData () {
_converse.session.save({
'smacks_enabled': _converse.session.get('smacks_enabled') || false,
'num_stanzas_handled': _converse.session.get('num_stanzas_handled') || 0,
'num_stanzas_handled_by_server': _converse.session.get('num_stanzas_handled_by_server') || 0,
'num_stanzas_since_last_ack': _converse.session.get('num_stanzas_since_last_ack') || 0,
'unacked_stanzas': _converse.session.get('unacked_stanzas') || []
});
}
function resetSessionData () {
_converse.session && _converse.session.save({
'smacks_enabled': false,
'num_stanzas_handled': 0,
'num_stanzas_handled_by_server': 0,
'num_stanzas_since_last_ack': 0,
'unacked_stanzas': []
});
}
function saveSessionData (el) {
const data = {'smacks_enabled': true};
if (['1', 'true'].includes(el.getAttribute('resume'))) {
data['smacks_stream_id'] = el.getAttribute('id');
}
_converse.session.save(data);
return true;
}
function onFailedStanza (el) {
if (el.querySelector('item-not-found')) {
// Stream resumption must happen before resource binding but
// enabling a new stream must happen after resource binding.
// Since resumption failed, we simply continue.
//
// After resource binding, sendEnableStanza will be called
// based on the afterResourceBinding event.
log.warn('Could not resume previous SMACKS session, session id not found. '+
'A new session will be established.');
} else {
log.error('Failed to enable stream management');
log.error(el.outerHTML);
}
resetSessionData();
/**
* Triggered when the XEP-0198 stream could not be resumed.
* @event _converse#streamResumptionFailed
*/
api.trigger('streamResumptionFailed');
return true;
}
function resendUnackedStanzas () {
const stanzas = _converse.session.get('unacked_stanzas');
// We clear the unacked_stanzas array because it'll get populated
// again in `onStanzaSent`
_converse.session.save('unacked_stanzas', []);
// XXX: Currently we're resending *all* unacked stanzas, including
// IQ[type="get"] stanzas that longer have handlers (because the
// page reloaded or we reconnected, causing removal of handlers).
//
// *Side-note:* Is it necessary to clear handlers upon reconnection?
//
// I've considered not resending those stanzas, but then keeping
// track of what's been sent and ack'd and their order gets
// prohibitively complex.
//
// It's unclear how much of a problem this poses.
//
// Two possible solutions are running @converse/headless as a
// service worker or handling IQ[type="result"] stanzas
// differently, more like push stanzas, so that they don't need
// explicit handlers.
stanzas.forEach(s => api.send(s));
}
function onResumedStanza (el) {
saveSessionData(el);
handleAck(el);
resendUnackedStanzas();
_converse.connection.do_bind = false; // No need to bind our resource anymore
_converse.connection.authenticated = true;
_converse.connection.restored = true;
_converse.connection._changeConnectStatus(Strophe.Status.CONNECTED, null);
}
async function sendResumeStanza () {
const promise = u.getResolveablePromise();
_converse.connection._addSysHandler(el => promise.resolve(onResumedStanza(el)), Strophe.NS.SM, 'resumed');
_converse.connection._addSysHandler(el => promise.resolve(onFailedStanza(el)), Strophe.NS.SM, 'failed');
const previous_id = _converse.session.get('smacks_stream_id');
const h = _converse.session.get('num_stanzas_handled');
const stanza = u.toStanza(`<resume xmlns="${Strophe.NS.SM}" h="${h}" previd="${previous_id}"/>`);
api.send(stanza);
_converse.connection.flush();
await promise;
}
async function sendEnableStanza () {
if (!api.settings.get('enable_smacks') || _converse.session.get('smacks_enabled')) {
return;
}
if (await isStreamManagementSupported()) {
const promise = u.getResolveablePromise();
_converse.connection._addSysHandler(el => promise.resolve(saveSessionData(el)), Strophe.NS.SM, 'enabled');
_converse.connection._addSysHandler(el => promise.resolve(onFailedStanza(el)), Strophe.NS.SM, 'failed');
const resume = (api.connection.isType('websocket') || _converse.isTestEnv());
const stanza = u.toStanza(`<enable xmlns="${Strophe.NS.SM}" resume="${resume}"/>`);
api.send(stanza);
_converse.connection.flush();
await promise;
}
}
async function enableStreamManagement () {
if (!api.settings.get('enable_smacks')) {
return;
}
if (!(await isStreamManagementSupported())) {
return;
}
_converse.connection.addHandler(stanzaHandler);
_converse.connection.addHandler(sendAck, Strophe.NS.SM, 'r');
_converse.connection.addHandler(handleAck, Strophe.NS.SM, 'a');
if (_converse.session.get('smacks_stream_id')) {
await sendResumeStanza();
} else {
resetSessionData();
}
}
function onStanzaSent (stanza) {
if (!_converse.session) {
log.warn('No _converse.session!');
return;
}
if (!_converse.session.get('smacks_enabled')) {
return;
}
if (u.isTagEqual(stanza, 'iq') ||
u.isTagEqual(stanza, 'presence') ||
u.isTagEqual(stanza, 'message')) {
const stanza_string = Strophe.serialize(stanza);
_converse.session.save(
'unacked_stanzas',
(_converse.session.get('unacked_stanzas') || []).concat([stanza_string])
);
const max_unacked = api.settings.get('smacks_max_unacked_stanzas');
if (max_unacked > 0) {
const num = _converse.session.get('num_stanzas_since_last_ack') + 1;
if (num % max_unacked === 0) {
// Request confirmation of sent stanzas
api.send(u.toStanza(`<r xmlns="${Strophe.NS.SM}"/>`));
}
_converse.session.save({'num_stanzas_since_last_ack': num});
}
}
}
api.listen.on('userSessionInitialized', initSessionData);
api.listen.on('beforeResourceBinding', enableStreamManagement);
api.listen.on('afterResourceBinding', sendEnableStanza);
api.listen.on('send', onStanzaSent);
}
});