聊聊Node.js中的网络与流
本篇文章带大家聊聊Node.js中的网络与流,涉及的知识点有ibuv中网络的实现、BSD 套接字、UNIX 域协议使用等,下面一起来看看吧!
【推荐学习:《nodejs 教程》】
本篇例子来源:http://docs.libuv.org/en/v1.x/guide/networking.html
涉及的知识点
- libuv 中网络的实现
- libuv 解决 accept (EMFILE错误)
- BSD 套接字
- SOCKADDR_IN
- UNIX 域协议使用! 在进程间传递“文件描述符”
例子 tcp-echo-server/main.c
libuv 异步使用 BSD 套接字 的例子
libuv 中的网络和直接使用 BSD 套接字接口没有什么不同,有些事情更简单,都是无阻塞的,但概念都是一样的。此外,libuv 还提供了一些实用的函数来抽象出那些烦人的、重复的、低级的任务,比如使用BSD套接字结构设置套接字、DNS查询以及调整各种套接字参数。
int main() { loop = uv_default_loop(); uv_tcp_t server; uv_tcp_init(loop, &server); uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr); uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0); int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection); if (r) { fprintf(stderr, "Listen error %s\n", uv_strerror(r)); return 1; } return uv_run(loop, UV_RUN_DEFAULT); } void on_new_connection(uv_stream_t *server, int status) { if (status < 0) { fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // error! return; } uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); uv_tcp_init(loop, client); if (uv_accept(server, (uv_stream_t*) client) == 0) { uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read); }
同步的例子
这是一个正常同步使用 BSD 套接字 的例子。
作为参照可以发现主要有如下几步
-
首先调用 socket() 为通讯创建一个端点,为套接字返回一个文件描述符。
-
接着调用 bind() 为一个套接字分配地址。当使用 socket() 创建套接字后,只赋予其所使用的协议,并未分配地址。在接受其它主机的连接前,必须先调用 bind() 为套接字分配一个地址。
-
当 socket 和一个地址绑定之后,再调用 listen() 函数会开始监听可能的连接请求。
-
最后调用 accept, 当应用程序监听来自其他主机的面对数据流的连接时,通过事件(比如Unix select()系统调用)通知它。必须用 accept()函数初始化连接。 accept() 为每个连接创立新的套接字并从监听队列中移除这个连接。
int main(void) { struct sockaddr_in stSockAddr; int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if(-1 == SocketFD) { perror("can not create socket"); exit(EXIT_FAILURE); } memset(&stSockAddr, 0, sizeof(struct sockaddr_in)); stSockAddr.sin_family = AF_INET; stSockAddr.sin_port = htons(1100); stSockAddr.sin_addr.s_addr = INADDR_ANY; if(-1 == bind(SocketFD,(const struct sockaddr *)&stSockAddr, sizeof(struct sockaddr_in))) { perror("error bind failed"); close(SocketFD); exit(EXIT_FAILURE); } if(-1 == listen(SocketFD, 10)) { perror("error listen failed"); close(SocketFD); exit(EXIT_FAILURE); } for(;;) { int ConnectFD = accept(SocketFD, NULL, NULL); if(0 > ConnectFD) { perror("error accept failed"); close(SocketFD); exit(EXIT_FAILURE); } /* perform read write operations ... */ shutdown(ConnectFD, SHUT_RDWR); close(ConnectFD); } close(SocketFD); return 0; }
uv_tcp_init
main > uv_tcp_init
1、对 domain 进行了验证, 需要是下面3种的一种
- AF_INET 表示 IPv4 网络协议
- AF_INET6 表示 IPv6
- AF_UNSPEC 表示适用于指定主机名和服务名且适合任何协议族的地址
2、tcp 也是一种流, 调用 uv__stream_init 对流数据进行初始化
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { return uv_tcp_init_ex(loop, tcp, AF_UNSPEC); } int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) { int domain; /* Use the lower 8 bits for the domain */ domain = flags & 0xFF; if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC) return UV_EINVAL; if (flags & ~0xFF) return UV_EINVAL; uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP); ... return 0; }
uv__stream_init
main > uv_tcp_init > uv__stream_init
流的初始化函数使用的地方还是特别多的, 也特别重要。下述 i/o 的完整实现参考 【libuv 源码学习笔记】线程池与i/o
1、对流会被调用的回调函数等进行一个初始化
- 如 read_cb 函数, 在本例子中 on_new_connection > uv_read_start 函数就会真实的设置该 read_cb 为用户传入的参数 echo_read, 其被调用时机是该 stream 上设置的 io_watcher.fd 有数据写入时, 在事件循环阶段被 epoll 捕获后。
- alloc_cb 函数的调用过程同 read_cb, alloc 类型函数一般是设置当前需要读取的内容长度, 在流数据传输时通常首先会写入本次传输数据的长度, 然后是具体的内容, 主要是为了接收方能够合理的申请内存进行存储。如 grpc, thread-loader 中都有详细的应用。
- close_cb 函数被调用在 stream 数据结束时或者出错时。
- connection_cb 函数如本例子 tcp 流, 当 accept 接收到新连接时被调用。本例子中即为 on_new_connection
- connect_req 结构主要用于 tcp 客户端相关连接回调等数据的挂载使用。
- shutdown_req 结构主要用于流 destroy 时回调等数据的挂载使用。
- accepted_fd 当 accept 接收到新连接时, 存储 accept(SocketFD, NULL, NULL) 返回的 ConnectFD。
- queued_fds 用于保存等待处理的连接, 其主要用于 node cluster 集群 的实现。
// queued_fds 1. 当收到其他进程通过 ipc 写入的数据时, 调用 uv__stream_recv_cmsg 函数 2. uv__stream_recv_cmsg 函数读取到进程传递过来的 fd 引用, 调用 uv__stream_queue_fd 函数保存。 3. queued_fds 被消费主要在 src/stream_wrap.cc LibuvStreamWrap::OnUvRead > AcceptHandle 函数中。
2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (EMFILE错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。
accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃
3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io
void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type) { int err; uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = NULL; stream->alloc_cb = NULL; stream->close_cb = NULL; stream->connection_cb = NULL; stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); stream->write_queue_size = 0; if (loop->emfile_fd == -1) { err = uv__open_cloexec("/dev/null", O_RDONLY); if (err < 0) /* In the rare case that "/dev/null" isn't mounted open "/" * instead. */ err = uv__open_cloexec("/", O_RDONLY); if (err >= 0) loop->emfile_fd = err; } #if defined(__APPLE__) stream->select = NULL; #endif /* defined(__APPLE_) */ uv__io_init(&stream->io_watcher, uv__stream_io, -1); }
uv__open_cloexec
main > uv_tcp_init > uv__stream_init > uv__open_cloexec
同步调用 open 方法拿到了 fd, 也许你会问为啥不像 【libuv 源码学习笔记】线程池与i/o 中调用 uv_fs_open 异步获取 fd, 其实 libuv 中并不全部是异步的实现, 比如当前的例子启动 tcp 服务前的一些初始化, 而不是用户请求过程中发生的任务, 同步也是能接受的。
int uv__open_cloexec(const char* path, int flags) { #if defined(O_CLOEXEC) int fd; fd = open(path, flags | O_CLOEXEC); if (fd == -1) return UV__ERR(errno); return fd; #else /* O_CLOEXEC */ int err; int fd; fd = open(path, flags); if (fd == -1) return UV__ERR(errno); err = uv__cloexec(fd, 1); if (err) { uv__close(fd); return err; } return fd; #endif /* O_CLOEXEC */ }
uv__stream_io
main > uv_tcp_init > uv__stream_init > uv__stream_io
双工流的 i/o 观察者回调函数, 如调用的 stream->connect_req 函数, 其值是例子中 uv_listen 函数的最后一个参数 on_new_connection。
-
当发生 POLLIN | POLLERR | POLLHUP 事件时: 该 fd 有可读数据时调用 uv__read 函数
-
当发生 POLLOUT | POLLERR | POLLHUP 事件时: 该 fd 有可读数据时调用 uv__write 函数
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; stream = container_of(w, uv_stream_t, io_watcher); assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); assert(!(stream->flags & UV_HANDLE_CLOSING)); if (stream->connect_req) { uv__stream_connect(stream); return; } assert(uv__stream_fd(stream) >= 0); if (events & (POLLIN | POLLERR | POLLHUP)) uv__read(stream); if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ if ((events & POLLHUP) && (stream->flags & UV_HANDLE_READING) && (stream->flags & UV_HANDLE_READ_PARTIAL) && !(stream->flags & UV_HANDLE_READ_EOF)) { uv_buf_t buf = { NULL, 0 }; uv__stream_eof(stream, &buf); } if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ if (events & (POLLOUT | POLLERR | POLLHUP)) { uv__write(stream); uv__write_callbacks(stream); /* Write queue drained. */ if (QUEUE_EMPTY(&stream->write_queue)) uv__drain(stream); } }
uv_ip4_addr
main > uv_ip4_addr
uv_ip4_addr 用于将人类可读的 IP 地址、端口对转换为 BSD 套接字 API 所需的 sockaddr_in 结构。
int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) { memset(addr, 0, sizeof(*addr)); addr->sin_family = AF_INET; addr->sin_port = htons(port); #ifdef SIN6_LEN addr->sin_len = sizeof(*addr); #endif return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr)); }
uv_tcp_bind
main > uv_tcp_bind
从 uv_ip4_addr 函数的实现, 其实是在 addr 的 sin_family 上面设置值为 AF_INET, 但在 uv_tcp_bind 函数里面却是从 addr 的 sa_family属性上面取的值, 这让 c 初学者的我又陷入了一阵思考 …
sockaddr_in 和 sockaddr 是并列的结构,指向 sockaddr_in 的结构体的指针也可以指向 sockaddr 的结构体,并代替它。也就是说,你可以使用 sockaddr_in 建立你所需要的信息,然后用 memset 函数初始化就可以了memset((char*)&mysock,0,sizeof(mysock));//初始化
原来是这样, 这里通过强制指针类型转换 const struct sockaddr* addr 达到的目的, 函数的最后调用了 uv__tcp_bind 函数。
int uv_tcp_bind(uv_tcp_t* handle, const struct sockaddr* addr, unsigned int flags) { unsigned int addrlen; if (handle->type != UV_TCP) return UV_EINVAL; if (addr->sa_family == AF_INET) addrlen = sizeof(struct sockaddr_in); else if (addr->sa_family == AF_INET6) addrlen = sizeof(struct sockaddr_in6); else return UV_EINVAL; return uv__tcp_bind(handle, addr, addrlen, flags); }
uv__tcp_bind
main > uv_tcp_bind > uv__tcp_bind
-
调用 maybe_new_socket, 如果当前未设置 socketfd, 则调用 new_socket 获取
-
调用 setsockopt 用于为指定的套接字设定一个特定的套接字选项
-
调用 bind 为一个套接字分配地址。当使用socket()创建套接字后,只赋予其所使用的协议,并未分配地址。
int uv__tcp_bind(uv_tcp_t* tcp, const struct sockaddr* addr, unsigned int addrlen, unsigned int flags) { int err; int on; /* Cannot set IPv6-only mode on non-IPv6 socket. */ if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6) return UV_EINVAL; err = maybe_new_socket(tcp, addr->sa_family, 0); if (err) return err; on = 1; if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) return UV__ERR(errno); ... errno = 0; if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) { if (errno == EAFNOSUPPORT) return UV_EINVAL; return UV__ERR(errno); } ... }
new_socket
main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket
-
通过 uv__socket 其本质调用 socket 获取到 sockfd
-
调用 uv__stream_open 设置 stream i/o 观察的 fd 为步骤1 拿到的 sockfd
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) { struct sockaddr_storage saddr; socklen_t slen; int sockfd; int err; err = uv__socket(domain, SOCK_STREAM, 0); if (err < 0) return err; sockfd = err; err = uv__stream_open((uv_stream_t*) handle, sockfd, flags); ... return 0; }
uv__stream_open
main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket > uv__stream_open
主要用于设置 stream->io_watcher.fd 为参数传入的 fd。
int uv__stream_open(uv_stream_t* stream, int fd, int flags) { #if defined(__APPLE__) int enable; #endif if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) return UV_EBUSY; assert(fd >= 0); stream->flags |= flags; if (stream->type == UV_TCP) { if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) return UV__ERR(errno); /* TODO Use delay the user passed in. */ if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60)) { return UV__ERR(errno); } } #if defined(__APPLE__) enable = 1; if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) && errno != ENOTSOCK && errno != EINVAL) { return UV__ERR(errno); } #endif stream->io_watcher.fd = fd; return 0; }
uv_listen
main > uv_listen
主要调用了 uv_tcp_listen 函数。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int err; err = ERROR_INVALID_PARAMETER; switch (stream->type) { case UV_TCP: err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); break; case UV_NAMED_PIPE: err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); break; default: assert(0); } return uv_translate_sys_error(err); }
uv_tcp_listen
main > uv_listen > uv_tcp_listen
-
调用 listen 开始监听可能的连接请求
-
挂载例子中传入的回调 on_new_connection
-
暴力改写 i/o 观察者的回调, 在上面的 uv__stream_init 函数中, 通过 uv__io_init 设置了 i/o 观察者的回调为 uv__stream_io, 作为普通的双工流是适用的, 这里 tcp 流直接通过 tcp->io_watcher.cb = uv__server_io 赋值语句设置 i/o 观察者回调为 uv__server_io
-
调用 uv__io_start 注册 i/o 观察者, 开始监听工作。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { ... if (listen(tcp->io_watcher.fd, backlog)) return UV__ERR(errno); tcp->connection_cb = cb; tcp->flags |= UV_HANDLE_BOUND; /* Start listening for connections. */ tcp->io_watcher.cb = uv__server_io; uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN); return 0; }
uv__server_io
main > uv_listen > uv_tcp_listen > uv__server_io
tcp 流的 i/o 观察者回调函数
-
调用 uv__accept, 拿到该连接的 ConnectFD
-
此时如果出现了上面 uv__stream_init 时说的 accept (EMFILE错误), 则调用 uv__emfile_trick 函数
-
把步骤1拿到的 ConnectFD 挂载在了 stream->accepted_fd 上面
-
调用例子中传入的回调 on_new_connection
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { ... while (uv__stream_fd(stream) != -1) { assert(stream->accepted_fd == -1); err = uv__accept(uv__stream_fd(stream)); if (err < 0) { if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) return; /* Not an error. */ if (err == UV_ECONNABORTED) continue; /* Ignore. Nothing we can do about that. */ if (err == UV_EMFILE || err == UV_ENFILE) { err = uv__emfile_trick(loop, uv__stream_fd(stream)); if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) break; } stream->connection_cb(stream, err); continue; } UV_DEC_BACKLOG(w) stream->accepted_fd = err; stream->connection_cb(stream, 0); ... }
uv__emfile_trick
main > uv_listen > uv_tcp_listen > uv__server_io > uv__emfile_trick
在上面的 uv__stream_init 函数中, 我们发现 loop 的 emfile_fd 属性上通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符。
当出现 accept (EMFILE错误)即文件描述符用尽时的错误时
首先将 loop->emfile_fd 文件描述符, 使其能 accept 新连接, 然后我们新连接将其关闭,以使其低于EMFILE的限制。接下来,我们接受所有等待的连接并关闭它们以向客户发出信号,告诉他们我们已经超载了–我们确实超载了,但是我们仍在继续工作。
static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { int err; int emfile_fd; if (loop->emfile_fd == -1) return UV_EMFILE; uv__close(loop->emfile_fd); loop->emfile_fd = -1; do { err = uv__accept(accept_fd); if (err >= 0) uv__close(err); } while (err >= 0 || err == UV_EINTR); emfile_fd = uv__open_cloexec("/", O_RDONLY); if (emfile_fd >= 0) loop->emfile_fd = emfile_fd; return err; }
on_new_connection
当收到一个新连接, 例子中的 on_new_connection 函数被调用
-
通过 uv_tcp_init 初始化了一个 tcp 客户端流
-
调用 uv_accept 函数
void on_new_connection(uv_stream_t *server, int status) { if (status < 0) { fprintf(stderr, "New connection error %s\n", uv_strerror(status)); // error! return; } uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); uv_tcp_init(loop, client); if (uv_accept(server, (uv_stream_t*) client) == 0) { uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read); }
uv_accept
on_new_connection > uv_accept
根据不同的协议调用不同的方法, 该例子 tcp 调用 uv__stream_open 方法
uv__stream_open 设置给初始化完成的 client 流设置了 i/o 观察者的 fd。该 fd 即是 uv__server_io 中提到的 ConnectFD 。
int uv_accept(uv_stream_t* server, uv_stream_t* client) { int err; assert(server->loop == client->loop); if (server->accepted_fd == -1) return UV_EAGAIN; switch (client->type) { case UV_NAMED_PIPE: case UV_TCP: err = uv__stream_open(client, server->accepted_fd, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); if (err) { /* TODO handle error */ uv__close(server->accepted_fd); goto done; } break; case UV_UDP: err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); if (err) { uv__close(server->accepted_fd); goto done; } break; default: return UV_EINVAL; } client->flags |= UV_HANDLE_BOUND; done: /* Process queued fds */ if (server->queued_fds != NULL) { uv__stream_queued_fds_t* queued_fds; queued_fds = server->queued_fds; /* Read first */ server->accepted_fd = queued_fds->fds[0]; /* All read, free */ assert(queued_fds->offset > 0); if (--queued_fds->offset == 0) { uv__free(queued_fds); server->queued_fds = NULL; } else { /* Shift rest */ memmove(queued_fds->fds, queued_fds->fds + 1, queued_fds->offset * sizeof(*queued_fds->fds)); } } else { server->accepted_fd = -1; if (err == 0) uv__io_start(server->loop, &server->io_watcher, POLLIN); } return err; }
uv_read_start
on_new_connection > uv_read_start
开启一个流的监听工作
-
挂载回调函数 read_cb 为例子中的 echo_read, 当流有数据写入时被调用
-
挂载回调函数 alloc_cb 为例子中的 alloc_buffer
-
调用 uv__io_start 函数, 这可是老朋友了, 通常用在 uv__io_init 初始化 i/o 观察者后面, 用于注册 i/o 观察者。
uv_read_start 主要是调用了 uv__read_start 函数。开始了普通流的 i/o 过程。
- 初始化 i/o 观察者在 uv_tcp_init > uv_tcp_init_ex > uv__stream_init > uv__io_init 设置其观察者回调函数为 uv__stream_io
- 注册 i/o 观察者为 uv__io_start 开始监听工作。
int uv__read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just * expresses the desired state of the user. */ stream->flags |= UV_HANDLE_READING; /* TODO: try to do the read inline? */ /* TODO: keep track of tcp state. If we've gotten a EOF then we should * not start the IO watcher. */ assert(uv__stream_fd(stream) >= 0); assert(alloc_cb); stream->read_cb = read_cb; stream->alloc_cb = alloc_cb; uv__io_start(stream->loop, &stream->io_watcher, POLLIN); uv__handle_start(stream); uv__stream_osx_interrupt_select(stream); return 0; }
小结
- uv_tcp_init 初始化 TCP Server handle, 其绑定的 fd 为 socket 返回的 socketFd。
- uv_tcp_bind 调用 bind 为套接字分配一个地址
- uv_listen 调用 listen 开始监听可能的连接请求
- uv_accept 调用 accept 去接收一个新连接
- uv_tcp_init 初始化 TCP Client handle, 其绑定的 fd 为 accept 返回的 acceptFd, 剩下的就是一个普通流的读写 i/o 观察。
原文地址:https://juejin.cn/post/6982226661081088036
作者:多小凯
更多编程相关知识,请访问:编程视频!!
以上就是聊聊Node.js中的网络与流的详细内容,更多请关注其它相关文章!