luafan

APIs

Goals

Connector is used to create tcp/udp/fifo connection quickly with same interface. It simulate block api call for tcp/fifo, so it’s easy for you to write functions without callback hell.

create a logic connection to fifo/tcp/udp

bind a listener of fifo/tcp/udp

create a temporary fifo file that will not conflict with others.

URL_Format

CLI_APIs

yield until buf sent, if #buf is too big, it will be divided to parts (fifo MAX_LINE_SIZE = 8192).

yield to wait for expect data ready for read, return the read stream (fan.stream) on read ready, the default expect_length is 1.

close this connection.

send buf on background, using embedded private protocol to control squence, buf size limit around 65536 * (MTU payload len), support 33.75MB (65536 * (576 - 8 - 20 - 8)) over internet at least.

input buffer callback, using embedded private protocol to control squence, when all parts of the buffer received, this callback will be invoked.

output buffer sent callback, when all parts of the buffer have been sent, this callback will be invoked.

output buffer timeout callback, if callback return false, the package will be dropped, otherwise, the package will be resend.

SERV

Samples

benchmark server

Linux XXXXXX 4.4.0-70-generic #91-Ubuntu SMP Wed Mar 22 12:47:43 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

Intel(R) Core(TM) i7-2640M CPU @ 2.80GHz

MemTotal:        3922816 kB
count=118723 speed=59361.918
count=237557 speed=59417.623
count=356271 speed=59357.149
count=475312 speed=59521.302
count=593905 speed=59297.002
count=711984 speed=59040.119
count=830336 speed=59176.649
count=949218 speed=59441.624
count=1067916 speed=59349.566
local fan = require "fan"
local connector = require "fan.connector"
local utils = require "fan.utils"

local fifoname = connector.tmpfifoname()
local url = "fifo:" .. fifoname

local data = string.rep("abc", 1) -- change 1024 to 1 to test ping/pong performance.

local co = coroutine.create(function()
    serv = connector.bind(url)
    serv.onaccept = function(apt)
        -- apt.simulate_send_block = false
        print("onaccept")
        while true do
            local input = apt:receive()
            if not input then
                break
            end

            local buf = input:GetBytes()
            apt:send(buf)
        end
    end
end)
print(coroutine.resume(co))

fan.loop(function()
    cli = connector.connect(url)
    -- cli.simulate_send_block = false
    print(cli:send(data))

    local count = 0
    local last_count = 0
    local last_time = utils.gettime()

    coroutine.wrap(function()
        while true do
          fan.sleep(2)
          print(string.format("count=%d speed=%1.03f", count, (count - last_count) / (utils.gettime() - last_time)))
          last_time = utils.gettime()
          last_count = count
        end
    end)()

    while true do
        local input = cli:receive()
        if not input then
            break
        end

        local buf = input:GetBytes()
        count = count + 1

        cli:send(data)
     end
end)
count=75811 speed=37907.827
count=163627 speed=43908.398
count=253310 speed=44841.976
count=339962 speed=43325.525
count=419818 speed=39929.276
count=509269 speed=44725.767
count=593292 speed=42011.710
count=678407 speed=42557.688
count=761588 speed=41590.897
count=850561 speed=44486.792
local fan = require "fan"
local connector = require "fan.connector"
local utils = require "fan.utils"

local co = coroutine.create(function()
    serv = connector.bind("tcp://127.0.0.1:10000")
    serv.onaccept = function(apt)
        -- apt.simulate_send_block = false
        print("onaccept")
        while true do
            local input = apt:receive()
            if not input then
                break
            end

            local buf = input:GetBytes()
            apt:send(buf)
        end
    end
end)
coroutine.resume(co)

fan.loop(function()
    cli = connector.connect("tcp://127.0.0.1:10000")
    -- cli.simulate_send_block = false
    cli:send("hi")

    coroutine.wrap(function()
        while true do
            fan.sleep(1)
            cli:send("noise")
        end
        end)()

    local count = 0
    local last_count = 0
    local last_time = utils.gettime()

    coroutine.wrap(function()
        while true do
          fan.sleep(2)
          print(string.format("count=%d speed=%1.03f", count, (count - last_count) / (utils.gettime() - last_time)))
          last_time = utils.gettime()
          last_count = count
        end
    end)()

    local data = os.date()

    while true do
        local input = cli:receive()
        if not input then
            break
        end

        -- print("client read", input)
        input:GetBytes()
        count = count + 1

        cli:send(data)
     end
end)
                373Mb         745Mb          1.09Gb        1.46Gb
  └─────────────┴─────────────┴──────────────┴─────────────┴──────────────
  localhost              => localhost              1.54Gb  1.53Gb  1.51Gb
                         <=                           0b      0b      0b

  ────────────────────────────────────────────────────────────────────────
  TX:             cum:   26.4GB   peak:   rates:   1.54Gb  1.53Gb  1.51Gb
  RX:                       0B               0b       0b      0b      0b
  TOTAL:                 26.4GB           1.54Gb   1.54Gb  1.53Gb  1.51Gb
local fan = require "fan"
local config = require "config"

-- override config settings.
config.udp_mtu = 8192
config.udp_waiting_count = 10
config.udp_package_timeout = 3

local utils = require "fan.utils"
local connector = require "fan.connector"

if fan.fork() > 0 then
  local longstr = string.rep("abc", 102400)
  print(#(longstr))

  fan.loop(function()
      fan.sleep(1)
      cli = connector.connect("udp://127.0.0.1:10000")

      local count = 0
      local last_count = 0
      local last_time = utils.gettime()

      -- coroutine.wrap(function()
      --     while true do
      --       fan.sleep(2)
      --       print(string.format("count=%d speed=%1.03f", count, (count - last_count) / (utils.gettime() - last_time)))
      --       last_time = utils.gettime()
      --       last_count = count
      --     end
      -- end)()

      cli.onread = function(apt, body)
        count = count + 1
        -- print("cli onread", #(body))
        -- assert(body == longstr)
        -- local start = utils.gettime()
        cli:send(longstr)
        -- print(utils.gettime() - start)
      end

      cli:send(longstr)
    end)
else
  local co = coroutine.create(function()
      serv = connector.bind("udp://127.0.0.1:10000")
      serv.onaccept = function(apt)
        print("onaccept")
        apt.onread = function(apt, body)
          -- print("apt onread", #(body))
          apt:send(body)
        end
      end
    end)
  assert(coroutine.resume(co))

  fan.loop()

end
              369Mb         738Mb          1.08Gb        1.44Gb   1.80Gb
└─────────────┴─────────────┴──────────────┴─────────────┴──────────────
localhost              => localhost              2.71Gb  2.63Gb  2.64Gb
                       <=                           0b      0b      0b

────────────────────────────────────────────────────────────────────────
TX:             cum:   25.4GB   peak:   rates:   2.71Gb  2.63Gb  2.64Gb
RX:                       0B               0b       0b      0b      0b
TOTAL:                 25.4GB           2.71Gb   2.71Gb  2.63Gb  2.64Gb
local fan = require "fan"
local stream = require "fan.stream"
local connector = require "fan.connector"

local data = string.rep([[coroutine.create]], 1000)

local co = coroutine.create(function()
    serv = connector.bind("tcp://127.0.0.1:10000")
    serv.onaccept = function(apt)
      print("onaccept")
      local last_expect = 1

      while true do
        local input = apt:receive(last_expect)
        if not input then
          break
        end
        -- print("serv read", input:available())

        local str,expect = input:GetString()
        if str then
          last_expect = 1

          assert(str == data)

          local d = stream.new()
          d:AddString(str)
          apt:send(d:package())
        else
          last_expect = expect
        end
      end
    end
  end)
coroutine.resume(co)

fan.loop(function()
    cli = connector.connect("tcp://127.0.0.1:10000")
    local d = stream.new()
    d:AddString(data)
    cli:send(d:package())

    local last_expect = 1

    while true do
      local input = cli:receive(last_expect)
      if not input then
        break
      end
      -- print("cli read", input:available())

      local str,expect = input:GetString()
      if str then
        last_expect = 1

        assert(str == data)

        local d = stream.new()
        d:AddString(str)
        cli:send(d:package())
      else
        last_expect = expect
      end
    end
  end)