请选择 进入手机版 | 继续访问电脑版

技术控

    今日:0| 主题:61300
收藏本版 (1)
最新软件应用技术尽在掌握

[其他] Binary data over Phoenix sockets

[复制链接]
毁我爱他你真棒 发表于 2016-11-28 07:34:47
212 1
TL;DR

   How to send raw binary data through channels in the Phoenix framework and read the data on the client. We will be using the MessagePack format to deliver the payload, gzipping when it makes sense to do so.
  MessagePack you say? Well, what's wrong with JSON?

   Nothing! No, really! Except... it could be leaner. You see over here atStoiximan we are delivering lots and lots of data to our customers and so even minor gains from each payload will translate to lots of bytes saved in the end- both from our customers' mobile data plans and our server loads. Plus all the cool kids use raw bytes and we should too.
   You mentioned gzipping above. Everybody and their moms know that gzipped JSON data are smaller than their msgpacked+gzipped counterparts!

   True! But our payloads are usually small enough that gzipping does not make sense . So most of the time the data we send will only be msgpacked, and msgpack sizes are smaller (generally speaking) than json sizes. If your data do not fit this description though and your app is chunky rather than chatty, then by all means use JSON. That's right, this does mean less work for you!
  Anything else I should know before I do this?

   The techniques described here will only work with fairly modern browsers, i.e. those supporting websockets (which allow for raw byte sending and receiving) and TypedArrays, so IE 10 and above is what we're targeting here. If you need to support older browsers you will have to connect with the longpolling transport and send json data (possibly gzipped+base64'd -i.e. text data- but you should be taking the 4:3 base64 expansion into account before gzipping), doing some
    detective work   capability detection work to find out what's supported- none of which will be shown here to keep this post focused.
  OK, you got me somewhat interested and I'm not scared by the extra work, how do I do it?

   Go ahead and create a new folder named binary_data_over_phoenix_sockets/ , cd inside and create a sample project with mix phoenix.new . --no-ecto from the command line. Press Y when asked to fetch dependencies. Open up mix.exs and add msgpack-elixir as a dependency:
  1. # file: mix.exs
  2. defp deps do  
  3.   [
  4.     # ...other deps go here
  5.     {:message_pack, "~> 0.2.0"}
  6.   ]
  7. end
复制代码
  Then run mix deps.get to download it.
   First thing we need to do now is create our own msgpack serializer that we'll use with phoenix transports, looking of course at the default WebSocketSerializer.ex implementation for inspiration. Create folder web/transports and add new file message_pack_serializer.ex
  1. # file: web/transports/message_pack_serializer.ex
  2. defmodule BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer do  
  3.   @moduledoc false
  4.   @behaviour Phoenix.Transports.Serializer
  5.   alias Phoenix.Socket.Reply
  6.   alias Phoenix.Socket.Message
  7.   alias Phoenix.Socket.Broadcast
  8.   # only gzip data above 1K
  9.   @gzip_threshold 1024
  10.   def fastlane!(%Broadcast{} = msg) do
  11.     {:socket_push, :binary, pack_data(%{
  12.       topic: msg.topic,
  13.       event: msg.event,
  14.       payload: msg.payload
  15.     })}
  16.   end
  17.   def encode!(%Reply{} = reply) do
  18.     packed = pack_data(%{
  19.       topic: reply.topic,
  20.       event: "phx_reply",
  21.       ref: reply.ref,
  22.       payload: %{status: reply.status, response: reply.payload}
  23.     })
  24.     {:socket_push, :binary, packed}
  25.   end
  26.   def encode!(%Message{} = msg) do
  27.     # We need to convert the Message struct into a plain map for MessagePack to work properly.
  28.     # Alternatively we could have implemented the Enumerable behaviour. Pick your poison :)
  29.     {:socket_push, :binary, pack_data(Map.from_struct msg)}
  30.   end
  31.   # messages received from the clients are still in json format;
  32.   # for our use case clients are mostly passive listeners and made no sense
  33.   # to optimize incoming traffic
  34.   def decode!(message, _opts) do
  35.     message
  36.     |> Poison.decode!()
  37.     |> Phoenix.Socket.Message.from_map!()
  38.   end
  39.   defp pack_data(data) do
  40.     msgpacked = MessagePack.pack!(data, enable_string: true)
  41.     gzip_data(msgpacked, byte_size(msgpacked))
  42.   end
  43.   defp gzip_data(data, size) when size < @gzip_threshold, do: data
  44.   defp gzip_data(data, _size), do: :zlib.gzip(data)
  45. end
复制代码
  Anyone who bothered to click on the default implementation link will immediately see that the code is essentially the same, except now we're using MessagePack to serialize the payload (duh) and telling phoenix that payloads are :binary not :text data (which relates to websocket frame handling).
   Now open the web/channels/user_socket.ex file and override the default serializer with our own
  1. # file: web/channels/user_socket.ex
  2. ...
  3.   transport :websocket, Phoenix.Transports.WebSocket,
  4.     serializer: BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer
  5. ...
复制代码
  We will also need a channel to do the actual talking, so go ahead and run in the command line mix phoenix.gen.channel Test which will create the file web/channels/test_channel.ex . Follow the suggestion and add this channel to user_socket.ex
  1. # file: web/channels/user_socket.ex
  2. # add this line above the transport
  3. channel "test:lobby", BinaryDataOverPhoenixSockets.TestChannel
复制代码
  Note: depending on your phoenix version, you may need to run mix phoenix.gen.channel Test lobby instead.
   We should now add two callbacks that return some sample responses to showcase both plain and gzipped payload delivery. Open up test_channel.ex and add the following lines inside the TestChannel module:
  1. # file: web/channels/test_channel.ex
  2. def handle_in("small", _payload, socket) do  
  3.   push socket, "small_reply", %{"small response that will only be msgpacked" => true}
  4.   {:noreply, socket}
  5. end
  6. def handle_in("large", _payload, socket) do  
  7.   push socket, "large_reply", %{"large response that will be msgpacked+gzipped" =>  1..1000 |> Enum.map(fn _ -> 1000 end) |> Enum.into([])}
  8.   {:noreply, socket}
  9. end
复制代码
  And that wraps up the server portion of things! Run mix phoenix.server to fire up the app, open your modern browser and navigate to http://localhost:4000
  The client

  Time to get crackin' on the client. First things first: we need to gunzip data sent from the server (if gzipped), and we need to unpack the msgpacked message. Unfortunately there is no native browser support for any of these things, so we'll need to get a little creative.
  Unzipping:

   We will be using a stripped down version of imaya's zlib.js , since we only need the unzipping part. Go ahead and create the file gunzip.js under web/static/js/vendor/ , adding the contents from here .
  1. // file: web/static/js/vendor/gunzip.js
  2. // Paste in here the contents from https://github.com/StoiximanServices/blog/blob/master/binary_data_over_phoenix_sockets/web/static/js/vendor/gunzip.js
复制代码
Unpacking:

   Again we only need to unpack messages on the client, no need for packing. Searching around did not produce anything that fit our needs, so we get to roll our own MessagePack decoder- yay! Don't worry, the MessagePack spec is pretty straight forward -no need for lookahead/lookbehind- so modeling the decoder with a state machine should not pose too much of a problem (not to mention that this should be fast as well!)
   Create file web/static/js/msgpack.js and put the following inside:
  1. // file: web/static/msgpack.js
  2. let formats = {  
  3.   positiveFixIntStart: 0x00,
  4.   positiveFixIntEnd: 0x07F,
  5.   fixMapStart: 0x80,
  6.   fixMapEnd: 0x8F,
  7.   fixArrStart: 0x90,
  8.   fixArrEnd: 0x9F,
  9.   fixStrStart: 0xA0,
  10.   fixStrEnd: 0xBF,
  11.   nil: 0xC0,
  12.   none: 0xC1,
  13.   bFalse: 0xC2,
  14.   bTrue: 0xC3,
  15.   bin8: 0xC4,
  16.   bin16: 0xC5,
  17.   bin32: 0xC6,
  18.   ext8: 0xC7,
  19.   ext16: 0xC8,
  20.   ext32: 0xC9,
  21.   float32: 0xCA,
  22.   float64: 0xCB,
  23.   uint8: 0xCC,
  24.   uint16: 0xCD,
  25.   uint32: 0xCE,
  26.   uint64: 0xCF,
  27.   int8: 0xD0,
  28.   int16: 0xD1,
  29.   int32: 0xD2,
  30.   int64: 0xD3,
  31.   fixExt1: 0xD4,
  32.   fixExt2: 0xD5,
  33.   fixExt4: 0xD6,
  34.   fixExt8: 0xD7,
  35.   fixExt16: 0xD8,
  36.   str8: 0xD9,
  37.   str16: 0xDA,
  38.   str32: 0xDB,
  39.   array16: 0xDC,
  40.   array32: 0xDD,
  41.   map16: 0xDE,
  42.   map32: 0xDF,
  43.   negativeFixIntStart: 0xE0,
  44.   negativeFixIntEnd: 0xFF
  45. }
  46. /*
  47. Decode returns two element [pos, data] arrays: index 0 holds the new position of the parser, and index 1 contains the parsed data. We carry around the original  
  48. binary data array to avoid copying to new slices, while updating the parser position and recursively calling decode until we've consumed all the buffer.  
  49. Missing from this implementation is extension support- add it if you need it.  
  50. */
  51. let decode = function(binaryData, start){
  52.   start = start || 0;
  53.   let format = binaryData[start];
  54.   if(format <= formats.positiveFixIntEnd){
  55.     return [start + 1, format - formats.positiveFixIntStart];
  56.   }
  57.   if(format <= formats.fixMapEnd){
  58.     let keyCount = format - formats.fixMapStart;
  59.     return parseMap(binaryData, keyCount, start + 1);
  60.   }
  61.   if(format <= formats.fixArrEnd){
  62.     let len = format - formats.fixArrStart;
  63.     return parseArray(binaryData, len, start + 1);
  64.   }
  65.   if(format <= formats.fixStrEnd){
  66.     let len = format - formats.fixStrStart;
  67.     return parseUtf8String(binaryData, len, start + 1);
  68.   }
  69.   let pos, len;
  70.   switch(format){
  71.     case formats.nil:
  72.       return [start + 1, null];
  73.     case formats.bFalse:
  74.       return [start + 1, false];
  75.     case formats.bTrue:
  76.       return [start + 1, true];
  77.     case formats.bin8:
  78.       [pos, len] = parseUint(binaryData, 8, start + 1)
  79.       return parseBinaryArray(binaryData, len, pos);
  80.     case formats.bin16:
  81.       [pos, len] = parseUint(binaryData, 16, start + 1)
  82.       return parseBinaryArray(binaryData, len, pos);
  83.     case formats.bin32:
  84.       [pos, len] = parseUint(binaryData, 32, start + 1)
  85.       return parseBinaryArray(binaryData, len, pos);
  86.     case formats.float32:
  87.       return parseFloat(binaryData, 32, start + 1);
  88.     case formats.float64:
  89.       return parseFloat(binaryData, 64, start + 1);
  90.     case formats.uint8:
  91.       return parseUint(binaryData, 8, start + 1);
  92.     case formats.uint16:
  93.       return parseUint(binaryData, 16, start + 1);
  94.     case formats.uint32:
  95.       return parseUint(binaryData, 32, start + 1);
  96.     case formats.uint64:
  97.       return parseUint(binaryData, 64, start + 1);
  98.     case formats.int8:
  99.       return parseInt(binaryData, 8, start + 1);
  100.     case formats.int16:
  101.       return parseInt(binaryData, 16, start + 1);
  102.     case formats.int32:
  103.       return parseInt(binaryData, 32, start + 1);
  104.     case formats.int64:
  105.       return parseInt(binaryData, 64, start + 1);
  106.     case formats.str8:
  107.       [pos, len] = parseUint(binaryData, 8, start + 1);
  108.       return parseUtf8String(binaryData, len, pos);
  109.     case formats.str16:
  110.       [pos, len] = parseUint(binaryData, 16, start + 1);
  111.       return parseUtf8String(binaryData, len, pos);
  112.     case formats.str32:
  113.       [pos, len] = parseUint(binaryData, 32, start + 1);
  114.       return parseUtf8String(binaryData, len, pos);
  115.     case formats.array16:
  116.       [pos, len] = parseUint(binaryData, 16, start + 1);
  117.       return parseArray(binaryData, len, pos);
  118.     case formats.array32:
  119.       [pos, len] = parseUint(binaryData, 32, start + 1);
  120.       return parseArray(binaryData, len, pos);
  121.     case formats.map16:
  122.       [pos, len] = parseUint(binaryData, 16, start + 1);
  123.       return parseMap(binaryData, len, pos);
  124.     case formats.map32:
  125.       [pos, len] = parseUint(binaryData, 32, start + 1);
  126.       return parseMap(binaryData, len, pos);
  127.   }
  128.   if(format >= formats.negativeFixIntStart && format <= formats.negativeFixIntEnd){
  129.     return [start + 1, - (formats.negativeFixIntEnd - format + 1)]
  130.   }
  131.   throw new Error("I don't know how to decode format ["+format+"]");
  132. }
  133. function parseMap(binaryData, keyCount, start){  
  134.   let ret = {};
  135.   let pos = start;
  136.   for(let i = 0; i < keyCount; i++){
  137.     let [keypos, key] = decode(binaryData, pos);
  138.     pos = keypos;
  139.     let [valpos, value] = decode(binaryData, pos)
  140.     pos = valpos;
  141.     ret[key] = value;
  142.   }
  143.   return [pos, ret];
  144. }
  145. function parseArray(binaryData, length, start){  
  146.   let ret = [];
  147.   let pos = start;
  148.   for(let i = 0; i < length; i++){
  149.     let [newpos, data] = decode(binaryData, pos)
  150.     pos = newpos;
  151.     ret.push(data);
  152.   }
  153.   return [pos, ret];
  154. }
  155. function parseUint(binaryData, length, start){  
  156.   let num = 0;
  157.   let pos = start;
  158.   let count = length;
  159.   while (count > 0){
  160.     count-= 8;
  161.     num += binaryData[pos] << count
  162.     pos++;
  163.   }
  164.   return [pos, num];
  165. }
  166. function parseInt(binaryData, length, start){  
  167.   let [pos, unum] = parseUint(binaryData, length, start);
  168.   let s = 64 - length;
  169.   //https://github.com/inexorabletash/polyfill/blob/master/typedarray.js
  170.   return [pos, (unum << s) >> s];
  171. }
  172. function parseBinaryArray(binaryData, length, start){  
  173.   let m = binaryData.subarray || binaryData.slice;
  174.   let pos = start + length;
  175.   return [pos, m.call(binaryData, start, pos)];
  176. }
  177. function parseFloat(binaryData, length, start){  
  178.   let bytecount = length / 8;
  179.   let view = new DataView(new ArrayBuffer(length));
  180.   for(let i = start; i < bytecount; i++){
  181.     view.setUint8(i-start, binaryData[i]);
  182.   }
  183.   let fnName = "getFloat"+length;
  184.   let result = view[fnName](0, false);
  185.   return [start + bytecount, result]
  186. }
  187. function parseUtf8String(data, length, start){  
  188.   //from https://gist.github.com/boushley/5471599
  189.   var result = [];
  190.   var i = start;
  191.   var c = 0;
  192.   var c1 = 0;
  193.   var c2 = 0;
  194.   // If we have a BOM skip it
  195.   if (length >= 3 && data[i] === 0xef && data[i+1] === 0xbb && data[i+2] === 0xbf) {
  196.     i += 3;
  197.   }
  198.   let mark = length + start;
  199.   while (i < mark) {
  200.     c = data[i];
  201.     if (c < 128) {
  202.       result.push(String.fromCharCode(c));
  203.       i++;
  204.     } else if (c > 191 && c < 224) {
  205.       if( i+1 >= data.length ) {
  206.         throw "UTF-8 Decode failed. Two byte character was truncated.";
  207.       }
  208.       c2 = data[i+1];
  209.       result.push(String.fromCharCode( ((c&31)<<6) | (c2&63) ));
  210.       i += 2;
  211.     } else {
  212.       if (i+2 >= data.length) {
  213.         throw "UTF-8 Decode failed. Multi byte character was truncated.";
  214.       }
  215.       c2 = data[i+1];
  216.       c3 = data[i+2];
  217.       result.push(String.fromCharCode( ((c&15)<<12) | ((c2&63)<<6) | (c3&63) ));
  218.       i += 3;
  219.     }
  220.   }
  221.   return [mark, result.join('')];
  222. }
  223. let msgpack = {  
  224.   decode: function(binaryArray){
  225.     return decode(binaryArray)[1];
  226.   }
  227. }
  228. export default msgpack
复制代码
Phew! That was a mouthful. We're now ready to intercept the socket's incoming messages and use our decoder for parsing the data.
   Again, we're going to look to the default implementation for inspiration.
   We'll start with abstracting the msgpack parsing into it's own module for reusability and then use it inside socket.js . So start by creating new file web/static/js/binarySocket.js and
  add the following:
  1. //  file: web/static/js/binarySocket.js
  2. import msgpack from "./msgpack"
  3. /*lots of console.log() statements for educational purposes in this file, don't forget to remove them in production*/
  4. function convertToBinary(socket){
  5.   let parentOnConnOpen = socket.onConnOpen;
  6.   socket.onConnOpen = function(){
  7.     //setting this to arraybuffer will help us not having to deal with blobs
  8.     this.conn.binaryType = 'arraybuffer';
  9.     parentOnConnOpen.apply(this, arguments);
  10.   }
  11.   //we also need to override the onConnMessage function, where we'll be checking
  12.   //for binary data, and delegate to the default implementation if it's not what we expected
  13.   let parentOnConnMessage = socket.onConnMessage;
  14.   socket.onConnMessage = function (rawMessage){
  15.     if(!(rawMessage.data instanceof window.ArrayBuffer)){
  16.       return parentOnConnMessage.apply(this, arguments);
  17.     }
  18.     let msg = decodeMessage(rawMessage.data);
  19.     let topic = msg.topic;
  20.     let event = msg.event;
  21.     let payload = msg.payload;
  22.     let ref = msg.ref;
  23.     this.log("receive", (payload.status || "") + " " + topic + " " + event + " " + (ref && "(" + ref + ")" || ""), payload);
  24.     this.channels.filter(function (channel) {
  25.       return channel.isMember(topic);
  26.     }).forEach(function (channel) {
  27.       return channel.trigger(event, payload, ref);
  28.     });
  29.     this.stateChangeCallbacks.message.forEach(function (callback) {
  30.       return callback(msg);
  31.     });
  32.   }
  33.   return socket;
  34. }
  35. function decodeMessage(rawdata){  
  36.   if(!rawdata){
  37.     return;
  38.   }
  39.   let binary = new Uint8Array(rawdata);
  40.   let data;
  41.   //check for gzip magic bytes
  42.   if(binary.length > 2 && binary[0] === 0x1F && binary[1] === 0x8B){
  43.     let inflate = new window.Zlib.Gunzip(binary);
  44.     data = inflate.decompress();
  45.     console.log('received', binary.length, 'Bytes of gzipped data,', data.length, 'Bytes after inflating');
  46.   }
  47.   else{
  48.     console.log('received', binary.length, 'Bytes of plain msgpacked data');
  49.     data = binary;
  50.   }
  51.   let msg = msgpack.decode(data);
  52.   return msg;
  53. }
  54. export default {  
  55.   convertToBinary
  56. }
复制代码
  Then let's head over to web/static/js/socket.js and replace all contents with the following
  1. // file: web/static/js/socket.js
  2. import {Socket} from "phoenix"  
  3. import binarySocket from "./binarySocket"
  4. /*the type=msgpack param is only added to distinguish this connection
  5. from the phoenix live reload connection in the browser's network tab*/  
  6. let socket = new Socket("/socket", {params: {type: "msgpack"}})
  7. socket = binarySocket.convertToBinary(socket);
  8. socket.connect()
  9. //lets join the lobby
  10. let channel = socket.channel("test:lobby", {})
  11. channel.on("small_reply", function(data){  
  12.   console.log("small reply: server responded with", data);
  13. })
  14. channel.on("large_reply", function(data){  
  15.   console.log("large reply: server responded with", data);
  16. })
  17. channel.join()  
  18.   .receive("ok", resp => {
  19.     console.log("Joined successfully", resp)
  20.     channel.push("small")
  21.     channel.push("large")
  22.   })
  23.   .receive("error", resp => { console.log("Unable to join", resp) })
  24. export default socket
复制代码
  Lastly we'll need to import socket.js contents into our app.js file. Open up web/static/js/app.js and uncomment the following line:
  1. // file: web/static/js/app.js
  2. import socket from "./socket"
复制代码
  Your browser should have refreshed quite a lot of times by now; let's open the dev tools (F12), head over to the Network tab, switch to websocket frames debugging while keeping the console window open and then refresh your page once more. Make sure you're debugging the type=msgpack websocket connection (the other one is for phoenix's own live reload feature).
  What you should be seeing if everything worked is the binary frames sent from the server, the text (json) frames sent from the client (remember that all the work we did was for the server to be able to send msgpacked data), and the messages in the console happily notifying us everytime a response is received (heartbeats for the most part).
   

Binary data over Phoenix sockets

Binary data over Phoenix sockets-1-技术控-framework,customers,channels,Phoenix,deliver

  That's all folks! We hope you've enjoyed the post and we wish you lots of happy elixir coding!
   P.S. All code and tests are of course on github .
caothe89 发表于 2016-12-7 00:36:35
我也顶起出售广告位
回复 支持 反对

使用道具 举报

我要投稿

回页顶回复上一篇下一篇回列表
手机版/c.CoLaBug.com ( 粤ICP备05003221号 | 文网文[2010]257号 | 粤公网安备 44010402000842号 )

© 2001-2017 Comsenz Inc.

返回顶部 返回列表