Connector is used to create tcp/udp/fifo connection quickly with same interface. It simulate block api call for tcp/fifo, so it’s easy for you to write functions without callback hell.
Note: All TCP and UDP connections created through connector automatically use callback_self_first = true to avoid circular references and improve memory safety.
cli = connector.connect(url)create a logic connection to fifo/tcp/udp
serv = connector.bind(url)bind a listener of fifo/tcp/udp
filename = connector.tmpfifoname()create a temporary fifo file that will not conflict with others.
fifo:<file path>tcp://host:portudp://host:portcli:send(buf) (fifo/tcp)yield until buf sent, if #buf is too big, it will be divided to parts (fifo MAX_LINE_SIZE = 8192).
stream = cli:receive(expect_length?) (fifo/tcp)yield to wait for expect data ready for read, return the read stream (fan.stream) on read ready, the default expect_length is 1.
cli:close()(fifo/tcp)close this connection.
cli:send(buf) (udp with embedded private protocol)send buf on background, using embedded private protocol to control squence, buf size limit around 65536 * (MTU payload len), support 33.75MB (65536 * (576 - 8 - 20 - 8)) over internet at least.
cli.onread = function(cli, buf) end (udp with embedded private protocol)input buffer callback, using embedded private protocol to control squence, when all parts of the buffer received, this callback will be invoked.
Note: Since connector uses callback_self_first = true, the connection object is passed as the first parameter to avoid circular references.
cli.onsent = function(cli, package) end (udp with embedded private protocol)output buffer sent callback, when all parts of the buffer have been sent, this callback will be invoked.
cli.ontimeout = function(cli, package) end (udp with embedded private protocol)output buffer timeout callback, if callback return false, the package will be dropped, otherwise, the package will be resend.
serv.onaccept = function(cli) endLinux XXXXXX 4.4.0-70-generic #91-Ubuntu SMP Wed Mar 22 12:47:43 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
Intel(R) Core(TM) i7-2640M CPU @ 2.80GHz
MemTotal:        3922816 kB
listen on fifo file, and service echo request.
count=118723 speed=59361.918
count=237557 speed=59417.623
count=356271 speed=59357.149
count=475312 speed=59521.302
count=593905 speed=59297.002
count=711984 speed=59040.119
count=830336 speed=59176.649
count=949218 speed=59441.624
count=1067916 speed=59349.566
local fan = require "fan"
local connector = require "fan.connector"
local utils = require "fan.utils"
local fifoname = connector.tmpfifoname()
local url = "fifo:" .. fifoname
local data = string.rep("abc", 1) -- change 1024 to 1 to test ping/pong performance.
local co = coroutine.create(function()
    serv = connector.bind(url)
    serv.onaccept = function(apt)
        -- apt.simulate_send_block = false
        print("onaccept")
        while true do
            local input = apt:receive()
            if not input then
                break
            end
            local buf = input:GetBytes()
            apt:send(buf)
        end
    end
end)
print(coroutine.resume(co))
fan.loop(function()
    cli = connector.connect(url)
    -- cli.simulate_send_block = false
    print(cli:send(data))
    local count = 0
    local last_count = 0
    local last_time = utils.gettime()
    coroutine.wrap(function()
        while true do
          fan.sleep(2)
          print(string.format("count=%d speed=%1.03f", count, (count - last_count) / (utils.gettime() - last_time)))
          last_time = utils.gettime()
          last_count = count
        end
    end)()
    while true do
        local input = cli:receive()
        if not input then
            break
        end
        local buf = input:GetBytes()
        count = count + 1
        cli:send(data)
     end
end)
listen on tcp port, and service echo request.
count=75811 speed=37907.827
count=163627 speed=43908.398
count=253310 speed=44841.976
count=339962 speed=43325.525
count=419818 speed=39929.276
count=509269 speed=44725.767
count=593292 speed=42011.710
count=678407 speed=42557.688
count=761588 speed=41590.897
count=850561 speed=44486.792
local fan = require "fan"
local connector = require "fan.connector"
local utils = require "fan.utils"
local co = coroutine.create(function()
    serv = connector.bind("tcp://127.0.0.1:10000")
    serv.onaccept = function(apt)
        -- apt.simulate_send_block = false
        print("onaccept")
        while true do
            local input = apt:receive()
            if not input then
                break
            end
            local buf = input:GetBytes()
            apt:send(buf)
        end
    end
end)
coroutine.resume(co)
fan.loop(function()
    cli = connector.connect("tcp://127.0.0.1:10000")
    -- cli.simulate_send_block = false
    cli:send("hi")
    coroutine.wrap(function()
        while true do
            fan.sleep(1)
            cli:send("noise")
        end
        end)()
    local count = 0
    local last_count = 0
    local last_time = utils.gettime()
    coroutine.wrap(function()
        while true do
          fan.sleep(2)
          print(string.format("count=%d speed=%1.03f", count, (count - last_count) / (utils.gettime() - last_time)))
          last_time = utils.gettime()
          last_count = count
        end
    end)()
    local data = os.date()
    while true do
        local input = cli:receive()
        if not input then
            break
        end
        -- print("client read", input)
        input:GetBytes()
        count = count + 1
        cli:send(data)
     end
end)
connector.udp io benchmark
                373Mb         745Mb          1.09Gb        1.46Gb
  └─────────────┴─────────────┴──────────────┴─────────────┴──────────────
  localhost              => localhost              1.54Gb  1.53Gb  1.51Gb
                         <=                           0b      0b      0b
  ────────────────────────────────────────────────────────────────────────
  TX:             cum:   26.4GB   peak:   rates:   1.54Gb  1.53Gb  1.51Gb
  RX:                       0B               0b       0b      0b      0b
  TOTAL:                 26.4GB           1.54Gb   1.54Gb  1.53Gb  1.51Gb
local fan = require "fan"
local config = require "config"
-- override config settings.
config.udp_mtu = 8192
config.udp_waiting_count = 10
config.udp_package_timeout = 3
local utils = require "fan.utils"
local connector = require "fan.connector"
if fan.fork() > 0 then
  local longstr = string.rep("abc", 102400)
  print(#(longstr))
  fan.loop(function()
      fan.sleep(1)
      cli = connector.connect("udp://127.0.0.1:10000")
      local count = 0
      local last_count = 0
      local last_time = utils.gettime()
      -- coroutine.wrap(function()
      --     while true do
      --       fan.sleep(2)
      --       print(string.format("count=%d speed=%1.03f", count, (count - last_count) / (utils.gettime() - last_time)))
      --       last_time = utils.gettime()
      --       last_count = count
      --     end
      -- end)()
      cli.onread = function(apt, body)
        count = count + 1
        -- print("cli onread", #(body))
        -- assert(body == longstr)
        -- local start = utils.gettime()
        cli:send(longstr)
        -- print(utils.gettime() - start)
      end
      cli:send(longstr)
    end)
else
  local co = coroutine.create(function()
      serv = connector.bind("udp://127.0.0.1:10000")
      serv.onaccept = function(apt)
        print("onaccept")
        apt.onread = function(apt, body)
          -- print("apt onread", #(body))
          apt:send(body)
        end
      end
    end)
  assert(coroutine.resume(co))
  fan.loop()
end
connector.tcp io benchmark
iftop result: 338 MB/s
              369Mb         738Mb          1.08Gb        1.44Gb   1.80Gb
└─────────────┴─────────────┴──────────────┴─────────────┴──────────────
localhost              => localhost              2.71Gb  2.63Gb  2.64Gb
                       <=                           0b      0b      0b
────────────────────────────────────────────────────────────────────────
TX:             cum:   25.4GB   peak:   rates:   2.71Gb  2.63Gb  2.64Gb
RX:                       0B               0b       0b      0b      0b
TOTAL:                 25.4GB           2.71Gb   2.71Gb  2.63Gb  2.64Gb
local fan = require "fan"
local stream = require "fan.stream"
local connector = require "fan.connector"
local data = string.rep([[coroutine.create]], 1000)
local co = coroutine.create(function()
    serv = connector.bind("tcp://127.0.0.1:10000")
    serv.onaccept = function(apt)
      print("onaccept")
      local last_expect = 1
      while true do
        local input = apt:receive(last_expect)
        if not input then
          break
        end
        -- print("serv read", input:available())
        local str,expect = input:GetString()
        if str then
          last_expect = 1
          assert(str == data)
          local d = stream.new()
          d:AddString(str)
          apt:send(d:package())
        else
          last_expect = expect
        end
      end
    end
  end)
coroutine.resume(co)
fan.loop(function()
    cli = connector.connect("tcp://127.0.0.1:10000")
    local d = stream.new()
    d:AddString(data)
    cli:send(d:package())
    local last_expect = 1
    while true do
      local input = cli:receive(last_expect)
      if not input then
        break
      end
      -- print("cli read", input:available())
      local str,expect = input:GetString()
      if str then
        last_expect = 1
        assert(str == data)
        local d = stream.new()
        d:AddString(str)
        cli:send(d:package())
      else
        last_expect = expect
      end
    end
  end)