From e4a0028f5d31e545dc3869a42e8ec39bfdc4b4f3 Mon Sep 17 00:00:00 2001 From: Emily Hou Date: Thu, 21 Jun 2018 13:57:53 -0700 Subject: [PATCH] add streaming --- app/api.js | 41 +++++++++++++++++++++++------------------ app/ece.js | 8 +++----- server/prod.js | 2 +- server/routes/ws.js | 40 +++++++++++++++++++++++----------------- 4 files changed, 50 insertions(+), 41 deletions(-) diff --git a/app/api.js b/app/api.js index 30d0be36..12f0f64f 100644 --- a/app/api.js +++ b/app/api.js @@ -107,7 +107,8 @@ async function upload( metadata, verifierB64, keychain, - onprogress + onprogress, + canceller ) { const metadataHeader = arrayToB64(new Uint8Array(metadata)); const fileMeta = { @@ -115,23 +116,25 @@ async function upload( authorization: `send-v1 ${verifierB64}` }; - //send file header - ws.send(JSON.stringify(fileMeta)); - - function listenForRes() { + function listenForResponse() { return new Promise((resolve, reject) => { ws.addEventListener('message', function(msg) { const response = JSON.parse(msg.data); - resolve({ - url: response.url, - id: response.id, - ownerToken: response.owner - }); + if (response.error) { + reject(response.error); + } else { + resolve({ + url: response.url, + id: response.id, + ownerToken: response.owner + }); + } }); }); } - const resPromise = listenForRes(); + const resPromise = listenForResponse(); + ws.send(JSON.stringify(fileMeta)); const reader = stream.getReader(); let state = await reader.read(); @@ -139,8 +142,9 @@ async function upload( while (!state.done) { const buf = state.value; ws.send(buf); - if (ws.readyState !== 1) { - throw new Error(0); //should this be here + + if (canceller.cancelled) { + throw new Error(0); } onprogress([Math.min(streamInfo.fileSize, size), streamInfo.fileSize]); @@ -148,10 +152,10 @@ async function upload( state = await reader.read(); } - const res = await resPromise; + const response = await resPromise; ws.close(); - return res; + return response; } export async function uploadWs( @@ -166,12 +170,12 @@ export async function uploadWs( const port = window.location.port; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const ws = await asyncInitWebSocket(`${protocol}//${host}:${port}/api/ws`); - - //console.log(`made connection to websocket: ws://${host}:${port}/api/ws`) + const canceller = { cancelled: false }; return { cancel: function() { ws.close(4000, 'upload cancelled'); + canceller.cancelled = true; }, result: upload( ws, @@ -180,7 +184,8 @@ export async function uploadWs( metadata, verifierB64, keychain, - onprogress + onprogress, + canceller ) }; } diff --git a/app/ece.js b/app/ece.js index 1a9ecb23..c962106a 100644 --- a/app/ece.js +++ b/app/ece.js @@ -6,7 +6,7 @@ const TAG_LENGTH = 16; const KEY_LENGTH = 16; const MODE_ENCRYPT = 'encrypt'; const MODE_DECRYPT = 'decrypt'; -const RS = 1024 * 1024; +const RS = 1048576; const encoder = new TextEncoder(); @@ -16,7 +16,7 @@ function generateSalt(len) { return randSalt.buffer; } -export class ECETransformer { +class ECETransformer { constructor(mode, ikm, rs, salt) { this.mode = mode; this.prevChunk; @@ -139,7 +139,6 @@ export class ECETransformer { return Buffer.concat([Buffer.from(this.salt), nums]); } - //salt is arraybuffer, rs is int, length is int readHeader(buffer) { if (buffer.length < 21) { throw new Error('chunk too small for reading header'); @@ -259,7 +258,7 @@ class BlobSlicer { } } -export class BlobSliceStream extends ReadableStream { +class BlobSliceStream extends ReadableStream { constructor(blob, size, mode) { super(new BlobSlicer(blob, size, mode)); } @@ -272,7 +271,6 @@ mode: string, either 'encrypt' or 'decrypt' rs: int containing record size, optional salt: ArrayBuffer containing salt of KEY_LENGTH length, optional */ - export default class ECE { constructor(input, key, mode, rs, salt) { if (rs === undefined) { diff --git a/server/prod.js b/server/prod.js index 0d850eea..a59f6ec8 100644 --- a/server/prod.js +++ b/server/prod.js @@ -12,7 +12,7 @@ if (config.sentry_dsn) { const app = express(); expressWs(app, null, { perMessageDeflate: false }); -app.ws('/api/ws', require('./routes/ws')); //want to move this into routes/index.js but it's not working... +app.ws('/api/ws', require('./routes/ws')); routes(app); app.use( diff --git a/server/routes/ws.js b/server/routes/ws.js index 514060e5..4734daaa 100644 --- a/server/routes/ws.js +++ b/server/routes/ws.js @@ -10,17 +10,17 @@ const log = mozlog('send.upload'); module.exports = async function(ws, req) { let fileStream; - try { - ws.on('close', e => { - if (e !== 1000) { - if (fileStream !== undefined) { - fileStream.destroy(); - } + ws.on('close', e => { + if (e !== 1000) { + if (fileStream !== undefined) { + fileStream.destroy(); } - }); + } + }); - let first = true; - ws.on('message', function(message) { + let first = true; + ws.on('message', function(message) { + try { if (first) { const newId = crypto.randomBytes(5).toString('hex'); const owner = crypto.randomBytes(10).toString('hex'); @@ -29,11 +29,13 @@ module.exports = async function(ws, req) { const metadata = fileInfo.fileMetadata; const auth = fileInfo.authorization; - /* if (!metadata || !auth) { - return res.sendStatus(400); + ws.send( + JSON.stringify({ + error: 400 + }) + ); } - */ const meta = { owner, @@ -60,9 +62,13 @@ module.exports = async function(ws, req) { first = false; } - }); - } catch (e) { - log.error('upload', e); - //res.sendStatus(500); - } + } catch (e) { + log.error('upload', e); + ws.send( + JSON.stringify({ + error: 500 + }) + ); + } + }); };