Serialize message sending per-recipient

Add a pendingMessages object to MessageSender. This object holds
one promise per recipient number. We init this promise with
Promise.resolve(), and chain on promises for message sending, replacing
the previous promise with the newly chained promise each time. If the
current promise resolves and finds that it is still the last promise
in the chain, it removes itself.
This commit is contained in:
lilia 2015-10-19 13:52:44 -07:00
parent 4970cbeaed
commit a2c7ac0df9
2 changed files with 46 additions and 16 deletions

View file

@ -39634,6 +39634,7 @@ textsecure.MessageReceiver.prototype = {
*/
function MessageSender(url, username, password) {
this.server = new TextSecureServer(url, username, password);
this.pendingMessages = {};
}
MessageSender.prototype = {
@ -39736,7 +39737,7 @@ MessageSender.prototype = {
return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
if (devicesForNumber.length == 0)
return registerError(number, "Got empty device list when loading device keys", null);
doSendMessage(number, devicesForNumber, recurse);
return doSendMessage(number, devicesForNumber, recurse);
});
}
};
@ -39795,9 +39796,9 @@ MessageSender.prototype = {
}));
}
p.then(function() {
return p.then(function() {
var resetDevices = ((error.code == 410) ? error.response.staleDevices : error.response.missingDevices);
getKeysForNumber(number, resetDevices)
return getKeysForNumber(number, resetDevices)
.then(reloadDevicesAndSend(number, (error.code == 409)))
.catch(function(error) {
registerError(number, "Failed to reload device keys", error);
@ -39809,8 +39810,8 @@ MessageSender.prototype = {
});
}.bind(this);
numbers.forEach(function(number) {
textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
function sendToNumber(number) {
return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
return Promise.all(devicesForNumber.map(function(device) {
return textsecure.protocol_wrapper.hasOpenSession(device.encodedNumber).then(function(result) {
if (!result)
@ -39819,17 +39820,31 @@ MessageSender.prototype = {
})).then(function() {
return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
if (devicesForNumber.length == 0) {
getKeysForNumber(number, [1])
return getKeysForNumber(number, [1])
.then(reloadDevicesAndSend(number, true))
.catch(function(error) {
registerError(number, "Failed to retreive new device keys for number " + number, error);
});
} else
doSendMessage(number, devicesForNumber, true);
return doSendMessage(number, devicesForNumber, true);
});
});
});
});
}
numbers.forEach(function(number) {
var sendPrevious = this.pendingMessages[number] || Promise.resolve();
var sendCurrent = this.pendingMessages[number] = sendPrevious.then(function() {
return sendToNumber(number);
}).catch(function() {
return sendToNumber(number);
});
sendCurrent.then(function() {
if (this.pendingMessages[number] === sendCurrent) {
delete this.pendingMessages[number];
}
}.bind(this));
}.bind(this));
},
sendIndividualProto: function(number, proto, timestamp) {

View file

@ -3,6 +3,7 @@
*/
function MessageSender(url, username, password) {
this.server = new TextSecureServer(url, username, password);
this.pendingMessages = {};
}
MessageSender.prototype = {
@ -105,7 +106,7 @@ MessageSender.prototype = {
return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
if (devicesForNumber.length == 0)
return registerError(number, "Got empty device list when loading device keys", null);
doSendMessage(number, devicesForNumber, recurse);
return doSendMessage(number, devicesForNumber, recurse);
});
}
};
@ -164,9 +165,9 @@ MessageSender.prototype = {
}));
}
p.then(function() {
return p.then(function() {
var resetDevices = ((error.code == 410) ? error.response.staleDevices : error.response.missingDevices);
getKeysForNumber(number, resetDevices)
return getKeysForNumber(number, resetDevices)
.then(reloadDevicesAndSend(number, (error.code == 409)))
.catch(function(error) {
registerError(number, "Failed to reload device keys", error);
@ -178,8 +179,8 @@ MessageSender.prototype = {
});
}.bind(this);
numbers.forEach(function(number) {
textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
function sendToNumber(number) {
return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
return Promise.all(devicesForNumber.map(function(device) {
return textsecure.protocol_wrapper.hasOpenSession(device.encodedNumber).then(function(result) {
if (!result)
@ -188,17 +189,31 @@ MessageSender.prototype = {
})).then(function() {
return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) {
if (devicesForNumber.length == 0) {
getKeysForNumber(number, [1])
return getKeysForNumber(number, [1])
.then(reloadDevicesAndSend(number, true))
.catch(function(error) {
registerError(number, "Failed to retreive new device keys for number " + number, error);
});
} else
doSendMessage(number, devicesForNumber, true);
return doSendMessage(number, devicesForNumber, true);
});
});
});
});
}
numbers.forEach(function(number) {
var sendPrevious = this.pendingMessages[number] || Promise.resolve();
var sendCurrent = this.pendingMessages[number] = sendPrevious.then(function() {
return sendToNumber(number);
}).catch(function() {
return sendToNumber(number);
});
sendCurrent.then(function() {
if (this.pendingMessages[number] === sendCurrent) {
delete this.pendingMessages[number];
}
}.bind(this));
}.bind(this));
},
sendIndividualProto: function(number, proto, timestamp) {