diff --git a/src/safeposix/syscalls/net_calls.rs b/src/safeposix/syscalls/net_calls.rs index 678a2451b..273bbc148 100644 --- a/src/safeposix/syscalls/net_calls.rs +++ b/src/safeposix/syscalls/net_calls.rs @@ -2896,12 +2896,77 @@ impl Cage { return 0; } + /// ## ------------------POLL SYSCALL------------------ + /// ### Description + /// poll_syscall performs a similar task to select_syscall: it waits for + /// one of a set of file descriptors to become ready to perform I/O. + + /// ### Function Arguments + /// The `poll_syscall()` receives two arguments: + /// * `fds` - The set of file descriptors to be monitored is specified in + /// the fds argument, which is an array of PollStruct structures + /// containing three fields: fd, events and revents. events and revents + /// are requested events and returned events, respectively. The field fd + /// contains a file descriptor for an open file. If this field is + /// negative, then the corresponding events field is ignored and the + /// revents field returns zero. The field events is an input parameter, a + /// bit mask specifying the events the application is interested in for + /// the file descriptor fd. The bits returned in revents can include any + /// of those specified in events, or POLLNVAL. The bits that may be + /// set/returned in events and revents are: 1. POLLIN: There is data to + /// read. 2. POLLPRI: There is some exceptional condition on the file + /// descriptor, currently not supported 3. POLLOUT: Writing is now + /// possible, though a write larger than the available space in a socket + /// or pipe will still block + /// 4. POLLNVAL: Invalid request: fd not open (only returned in revents; + /// ignored in events). + /// * `timeout` - The timeout argument is a RustDuration structure that + /// specifies the interval that poll() should block waiting for a file + /// descriptor to become ready. The call will block until either: 1. a + /// file descriptor becomes ready; 2. the call is interrupted by a signal + /// handler; 3. the timeout expires. + + /// ### Returns + /// On success, poll_syscall returns a nonnegative value which is the + /// number of elements in the pollfds whose revents fields have been + /// set to a nonzero value (indicating an event or an error). A + /// return value of zero indicates that the system call timed out + /// before any file descriptors became ready. + /// + /// ### Errors + /// * EINTR - A signal was caught. + /// * EINVAL - fd exceeds the FD_SET_MAX_FD. + /// + /// ### Panics + /// No panic is expected from this syscall pub fn poll_syscall( &self, fds: &mut [PollStruct], timeout: Option, ) -> i32 { - //timeout is supposed to be in milliseconds + // timeout is supposed to be in milliseconds + + // current implementation of poll_syscall is based on select_syscall + // which gives several issues: + // 1. according to standards, select_syscall should only support file descriptor + // that is smaller than 1024, while poll_syscall should not have such + // limitation but our implementation of poll_syscall is actually calling + // select_syscall directly which would mean poll_syscall would also have the + // 1024 maximum size limitation However, rustposix itself only support file + // descriptor that is smaller than 1024 which solves this issue automatically + // in an interesting way + // 2. current implementation of poll_syscall is very inefficient, that it passes + // each of the file descriptor into select_syscall one by one. A better + // solution might be transforming pollstruct into fdsets and pass into + // select_syscall once (TODO). A even more efficienct way would be completely + // rewriting poll_syscall so it does not depend on select_syscall anymore. + // This is also how Linux does for poll_syscall since Linux claims that poll + // have a better performance than select. + // 3. several revent value such as POLLERR (which should be set when pipe is + // broken), or POLLHUP (when peer closed its channel) are not possible to + // monitor. Since select_syscall does not have these features, so our + // poll_syscall, which derived from select_syscall, would subsequently not be + // able to support these features. let mut return_code: i32 = 0; let start_time = interface::starttimer(); @@ -2911,9 +2976,26 @@ impl Cage { None => interface::RustDuration::MAX, }; + // according to standard, we should clear all revents + for structpoll in &mut *fds { + structpoll.revents = 0; + } + + // we loop until either timeout + // or any of the file descriptor is ready loop { + // iterate through each file descriptor for structpoll in &mut *fds { + // get the file descriptor let fd = structpoll.fd; + + // according to standard, we should ignore all file descriptor + // that is smaller than 0 + if fd < 0 { + continue; + } + + // get the associated events to monitor let events = structpoll.events; // init FdSet structures @@ -2921,24 +3003,25 @@ impl Cage { let writes = &mut interface::FdSet::new(); let errors = &mut interface::FdSet::new(); - //read + // POLLIN for readable fd if events & POLLIN > 0 { reads.set(fd) } - //write + // POLLOUT for writable fd if events & POLLOUT > 0 { writes.set(fd) } - //err - if events & POLLERR > 0 { + // POLLPRI for except fd + if events & POLLPRI > 0 { errors.set(fd) } + // this mask is used for storing final revent result let mut mask: i16 = 0; - //0 essentially sets the timeout to the max value allowed (which is almost - // always more than enough time) NOTE that the nfds argument is - // highest fd + 1 + // here we just call select_syscall with timeout of zero, + // which essentially just check each fd set once then return + // NOTE that the nfds argument is highest fd + 1 let selectret = Self::select_syscall( &self, fd + 1, @@ -2947,23 +3030,44 @@ impl Cage { Some(errors), Some(interface::RustDuration::ZERO), ); + // if there is any file descriptor ready if selectret > 0 { - mask += if !reads.is_empty() { POLLIN } else { 0 }; - mask += if !writes.is_empty() { POLLOUT } else { 0 }; - mask += if !errors.is_empty() { POLLERR } else { 0 }; + // is the file descriptor ready to read? + mask |= if !reads.is_empty() { POLLIN } else { 0 }; + // is the file descriptor ready to write? + mask |= if !writes.is_empty() { POLLOUT } else { 0 }; + // is there any exception conditions on the file descriptor? + mask |= if !errors.is_empty() { POLLPRI } else { 0 }; + // this file descriptor is ready for something, + // increment the return value return_code += 1; } else if selectret < 0 { - return selectret; + // if there is any error, first check if the error + // is EBADF, which refers to invalid file descriptor error + // in this case, we should set POLLNVAL to revent + if selectret == -(Errno::EBADF as i32) { + mask |= POLLNVAL; + // according to standard, return value is the number of fds + // with non-zero revent, which may indicate an error as well + return_code += 1; + } else { + return selectret; + } } + // set the revents structpoll.revents = mask; } + // we break if there is any file descriptor ready + // or timeout is reached if return_code != 0 || interface::readtimer(start_time) > end_time { break; } else { + // otherwise, check for signal and loop again if interface::sigcheck() { return syscall_error(Errno::EINTR, "poll", "interrupted function call"); } + // We yield to let other threads continue if we've found no ready descriptors interface::lind_yield(); } } diff --git a/src/tests/networking_tests.rs b/src/tests/networking_tests.rs index f56e3bf1d..1c39f8f5a 100644 --- a/src/tests/networking_tests.rs +++ b/src/tests/networking_tests.rs @@ -670,19 +670,248 @@ pub mod net_tests { } #[test] - pub fn ut_lind_net_poll() { - //acquiring a lock on TESTMUTEX prevents other tests from running concurrently, + pub fn ut_lind_net_poll_bad_input() { + // this test is used for testing poll with some error/edge cases + + // acquiring a lock on TESTMUTEX prevents other tests from running concurrently, + // and also performs clean env setup + let _thelock = setup::lock_and_init(); + + let cage = interface::cagetable_getref(1); + + // error case 1: invalid file descriptor + // contruct a PollStruct with invalid fd (10) + let mut polled = vec![interface::PollStruct { + fd: 10, + events: POLLIN, + revents: 0, + }]; + + // exactly one fd should have non-zero revents field + assert_eq!(cage.poll_syscall(&mut polled.as_mut_slice(), None), 1); + // and its revents should be set to POLLNVAL + assert_eq!(polled[0].revents, POLLNVAL); + + // error case 2: negative file descriptor should be ignored + // contruct a PollStruct with negative fd + let mut polled = vec![interface::PollStruct { + fd: -1, + events: POLLIN, + revents: 0, + }]; + + // the fd should be ignored, so no error is expected + assert_eq!( + cage.poll_syscall( + &mut polled.as_mut_slice(), + Some(interface::RustDuration::ZERO) + ), + 0 + ); + // revents should be 0 + assert_eq!(polled[0].revents, 0); + + // edge case: revents should always be cleared + // create a file + let filefd = cage.open_syscall("/netpolltest.txt", O_CREAT | O_EXCL | O_RDWR, S_IRWXA); + assert!(filefd > 0); + // create a pipe + let mut pipefds = PipeArray { + readfd: -1, + writefd: -1, + }; + assert_eq!(cage.pipe2_syscall(&mut pipefds, O_NONBLOCK), 0); + // contruct a PollStruct with three PollStruct: + // 1. normal file with non-zero revents, test for revents when the fd is ready + // 2. negative fd with non-zero revents, even this fd should be ignored, its + // revents should still be cleared + // 3. pipe readfd, test for revents when the fd is not ready + let mut polled = vec![ + interface::PollStruct { + fd: filefd, + events: POLLIN, + revents: 123, + }, + interface::PollStruct { + fd: -1, + events: POLLIN, + revents: 123, + }, + interface::PollStruct { + fd: pipefds.readfd, + events: POLLIN, + revents: 123, + }, + ]; + // should have exactly one fd ready (file fd) + assert_eq!(cage.poll_syscall(&mut polled.as_mut_slice(), None), 1); + assert_eq!(polled[0].revents, POLLIN); // file fd + assert_eq!(polled[1].revents, 0); // negative fd + assert_eq!(polled[2].revents, 0); // unready fd + + assert_eq!(cage.exit_syscall(EXIT_SUCCESS), EXIT_SUCCESS); + lindrustfinalize(); + } + + #[test] + pub fn ut_lind_net_poll_timeout() { + // this test is used for testing poll with timeout behaviors specifically + + // acquiring a lock on TESTMUTEX prevents other tests from running concurrently, // and also performs clean env setup let _thelock = setup::lock_and_init(); let cage = interface::cagetable_getref(1); + // subtest 1: poll when timeout could expire + // create a TCP AF_INET socket + let serversockfd = cage.socket_syscall(AF_INET, SOCK_STREAM, 0); + let clientsockfd = cage.socket_syscall(AF_INET, SOCK_STREAM, 0); + assert!(serversockfd > 0); + assert!(clientsockfd > 0); + + let port: u16 = generate_random_port(); + let sockaddr = interface::SockaddrV4 { + sin_family: AF_INET as u16, + sin_port: port.to_be(), + sin_addr: interface::V4Addr { + s_addr: u32::from_ne_bytes([127, 0, 0, 1]), + }, + padding: 0, + }; + let socket = interface::GenSockaddr::V4(sockaddr); //127.0.0.1 from bytes above + + // server bind and listen + assert_eq!(cage.bind_syscall(serversockfd, &socket), 0); + assert_eq!(cage.listen_syscall(serversockfd, 4), 0); + + assert_eq!(cage.fork_syscall(2), 0); + assert_eq!(cage.close_syscall(clientsockfd), 0); + + // this barrier is used for preventing + // an unfixed bug (`close` could block when other thread/cage is `accept`) from + // deadlocking the test + let barrier = Arc::new(Barrier::new(2)); + let barrier_2 = barrier.clone(); + + //client connects to the server to send and recv data... + let threadclient = interface::helper_thread(move || { + let cage2 = interface::cagetable_getref(2); + assert_eq!(cage2.close_syscall(serversockfd), 0); + + barrier_2.wait(); + + // connect to the server + assert_eq!(cage2.connect_syscall(clientsockfd, &socket), 0); + + // wait for 100ms + interface::sleep(interface::RustDuration::from_millis(100)); + + // send some message to client + assert_eq!(cage2.send_syscall(clientsockfd, str2cbuf("test"), 4, 0), 4); + + assert_eq!(cage2.close_syscall(clientsockfd), 0); + cage2.exit_syscall(EXIT_SUCCESS); + }); + + // make sure client thread closed the duplicated socket before server start to + // accept + barrier.wait(); + + // wait for client to connect + let mut sockgarbage = interface::GenSockaddr::V4(interface::SockaddrV4::default()); + let sockfd = cage.accept_syscall(serversockfd as i32, &mut sockgarbage); + + // create PollStruct + let mut polled = vec![interface::PollStruct { + fd: sockfd, + events: POLLIN, + revents: 0, + }]; + + // this counter is used for recording how many times do poll returns due to + // timeout + let mut counter = 0; + + loop { + let poll_result = cage.poll_syscall( + &mut polled.as_mut_slice(), + Some(interface::RustDuration::new(0, 10000000)), // 10ms + ); + assert!(poll_result >= 0); + // poll timeout after 10ms, but client will send messages after 100ms + // so there should be some timeout return + if poll_result == 0 { + counter += 1; + } else if polled[0].revents & POLLIN != 0 { + // just received the message, check the message and break + let mut buf = sizecbuf(4); + assert_eq!(cage.recv_syscall(sockfd, buf.as_mut_ptr(), 4, 0), 4); + assert_eq!(cbuf2str(&buf), "test"); + break; + } else { + unreachable!(); + } + } + // check if poll timeout correctly + assert!(counter > 0); + + threadclient.join().unwrap(); + + // subtest 2: poll when all arguments were None except for timeout + // since no set is passed into `poll`, `poll` here should behave like + // `sleep` + let start_time = interface::starttimer(); + let timeout = interface::RustDuration::new(0, 10000000); // 10ms + let poll_result = cage.poll_syscall(&mut vec![].as_mut_slice(), Some(timeout)); + assert!(poll_result == 0); + // should wait for at least 10ms + assert!(interface::readtimer(start_time) >= timeout); + + assert_eq!(cage.exit_syscall(EXIT_SUCCESS), EXIT_SUCCESS); + lindrustfinalize(); + } + + #[test] + #[ignore] + pub fn ut_lind_net_poll() { + // test for poll monitoring on multiple different file descriptors: + // 1. regular file + // 2. AF_INET server socket waiting for two clients + // 3. AF_INET server socket's connection file descriptor with clients + // 4. AF_UNIX server socket's connection file descriptor with a client + // 5. pipe + + // acquiring a lock on TESTMUTEX prevents other tests from running concurrently, + // and also performs clean env setup + let _thelock = setup::lock_and_init(); + let cage = interface::cagetable_getref(1); + + // creating regular file's file descriptor let filefd = cage.open_syscall("/netpolltest.txt", O_CREAT | O_EXCL | O_RDWR, S_IRWXA); assert!(filefd > 0); + // creating socket file descriptors let serversockfd = cage.socket_syscall(AF_INET, SOCK_STREAM, 0); let clientsockfd1 = cage.socket_syscall(AF_INET, SOCK_STREAM, 0); let clientsockfd2 = cage.socket_syscall(AF_INET, SOCK_STREAM, 0); + let serversockfd_unix = cage.socket_syscall(AF_UNIX, SOCK_STREAM, 0); + let clientsockfd_unix = cage.socket_syscall(AF_UNIX, SOCK_STREAM, 0); + + assert!(serversockfd > 0); + assert!(clientsockfd1 > 0); + assert!(clientsockfd2 > 0); + assert!(serversockfd_unix > 0); + assert!(clientsockfd_unix > 0); + + // creating a pipe + let mut pipefds = PipeArray { + readfd: -1, + writefd: -1, + }; + assert_eq!(cage.pipe_syscall(&mut pipefds), 0); + + // create a INET address let port: u16 = generate_random_port(); let sockaddr = interface::SockaddrV4 { @@ -694,177 +923,319 @@ pub mod net_tests { padding: 0, }; let socket = interface::GenSockaddr::V4(sockaddr); //127.0.0.1 from bytes above + + //binding to a socket + let serversockaddr_unix = + interface::new_sockaddr_unix(AF_UNIX as u16, "server_poll".as_bytes()); + let serversocket_unix = interface::GenSockaddr::Unix(serversockaddr_unix); + + let clientsockaddr_unix = + interface::new_sockaddr_unix(AF_UNIX as u16, "client_poll".as_bytes()); + let clientsocket_unix = interface::GenSockaddr::Unix(clientsockaddr_unix); + + assert_eq!(cage.bind_syscall(serversockfd_unix, &serversocket_unix), 0); + assert_eq!(cage.bind_syscall(clientsockfd_unix, &clientsocket_unix), 0); + assert_eq!(cage.listen_syscall(serversockfd_unix, 1), 0); + assert_eq!(cage.bind_syscall(serversockfd, &socket), 0); assert_eq!(cage.listen_syscall(serversockfd, 4), 0); + // create a PollStruct for each fd let serverpoll = interface::PollStruct { fd: serversockfd, events: POLLIN, revents: 0, }; + + let serverunixpoll = interface::PollStruct { + fd: serversockfd_unix, + events: POLLIN, + revents: 0, + }; + let filepoll = interface::PollStruct { fd: filefd, + events: POLLIN | POLLOUT, + revents: 0, + }; + + let pipepoll = interface::PollStruct { + fd: pipefds.readfd, events: POLLIN, revents: 0, }; - let mut polled = vec![serverpoll, filepoll]; - cage.fork_syscall(2); - //client 1 connects to the server to send and recv data... - let thread1 = interface::helper_thread(move || { - interface::sleep(interface::RustDuration::from_millis(30)); + let mut polled = vec![filepoll, serverpoll, serverunixpoll, pipepoll]; + + assert_eq!(cage.fork_syscall(2), 0); // used for AF_INET thread client 1 + assert_eq!(cage.fork_syscall(3), 0); // used for AF_INET thread client 2 + assert_eq!(cage.fork_syscall(4), 0); // used for AF_UNIX thread client + + assert_eq!(cage.fork_syscall(5), 0); // used for pipe thread + + assert_eq!(cage.close_syscall(clientsockfd1), 0); + assert_eq!(cage.close_syscall(clientsockfd2), 0); + assert_eq!(cage.close_syscall(clientsockfd_unix), 0); + + // this barrier have to ensure that the clients finish the connect before we do + // the poll due to an unfixed bug (`close` could block when other + // thread/cage is `accept`) + let barrier = Arc::new(Barrier::new(3)); + let barrier_clone1 = barrier.clone(); + let barrier_clone2 = barrier.clone(); + + // this barrier is used for control the flow the pipe + let barrier_pipe = Arc::new(Barrier::new(2)); + let barrier_pipe_clone = barrier_pipe.clone(); + + // due to an unfixed bug in ref counter of AF_UNIX socket pipe + // have to make sure all the threads exits only after the AF_UNIX test finished + let barrier_exit = Arc::new(Barrier::new(4)); + let barrier_exit_clone1 = barrier_exit.clone(); + let barrier_exit_clone2 = barrier_exit.clone(); + let barrier_exit_clone3 = barrier_exit.clone(); + + // client 1 connects to the server to send and recv data + let threadclient1 = interface::helper_thread(move || { let cage2 = interface::cagetable_getref(2); + assert_eq!(cage2.close_syscall(serversockfd), 0); + assert_eq!(cage2.close_syscall(clientsockfd2), 0); + // connect to server assert_eq!(cage2.connect_syscall(clientsockfd1, &socket), 0); - assert_eq!( - cage2.send_syscall(clientsockfd1, str2cbuf(&"test"), 4, 0), - 4 - ); - //giving it a longer pause time to that it can process all of the data that it - // is recieving - interface::sleep(interface::RustDuration::from_millis(100)); + barrier_clone1.wait(); - assert_eq!(cage2.close_syscall(serversockfd), 0); + // send message to server + assert_eq!(cage2.send_syscall(clientsockfd1, str2cbuf("test"), 4, 0), 4); + + interface::sleep(interface::RustDuration::from_millis(1)); + + // receive message from server + let mut buf = sizecbuf(4); + assert_eq!(cage2.recv_syscall(clientsockfd1, buf.as_mut_ptr(), 4, 0), 4); + assert_eq!(cbuf2str(&buf), "test"); + + assert_eq!(cage2.close_syscall(clientsockfd1), 0); + barrier_exit_clone1.wait(); cage2.exit_syscall(EXIT_SUCCESS); }); - cage.fork_syscall(3); - //client 2 connects to the server to send and recv data... - let thread2 = interface::helper_thread(move || { - //give it a longer time so that it can sufficiently process all of the data - interface::sleep(interface::RustDuration::from_millis(45)); + // client 2 connects to the server to send and recv data + let threadclient2 = interface::helper_thread(move || { let cage3 = interface::cagetable_getref(3); + assert_eq!(cage3.close_syscall(serversockfd), 0); + assert_eq!(cage3.close_syscall(clientsockfd1), 0); + // connect to server assert_eq!(cage3.connect_syscall(clientsockfd2, &socket), 0); + barrier_clone2.wait(); + + // send message to server + assert_eq!(cage3.send_syscall(clientsockfd2, str2cbuf("test"), 4, 0), 4); + + interface::sleep(interface::RustDuration::from_millis(1)); + + // receive message from server + let mut buf = sizecbuf(4); + let mut result: i32; + loop { + result = cage3.recv_syscall(clientsockfd2, buf.as_mut_ptr(), 4, 0); + if result != -libc::EINTR { + break; // if the error was EINTR, retry the syscall + } + } + assert_eq!(result, 4); + assert_eq!(cbuf2str(&buf), "test"); + + assert_eq!(cage3.close_syscall(clientsockfd2), 0); + barrier_exit_clone2.wait(); + cage3.exit_syscall(EXIT_SUCCESS); + }); + + let threadclient_unix = interface::helper_thread(move || { + let cage4 = interface::cagetable_getref(4); + assert_eq!(cage4.close_syscall(serversockfd_unix), 0); + assert_eq!(cage4.close_syscall(serversockfd), 0); + + // connect to server assert_eq!( - cage3.send_syscall(clientsockfd2, str2cbuf(&"test"), 4, 0), + cage4.connect_syscall(clientsockfd_unix, &serversocket_unix), + 0 + ); + + // send message to server + assert_eq!( + cage4.send_syscall(clientsockfd_unix, str2cbuf("test"), 4, 0), 4 ); - interface::sleep(interface::RustDuration::from_millis(100)); + interface::sleep(interface::RustDuration::from_millis(1)); - assert_eq!(cage3.close_syscall(serversockfd), 0); - cage3.exit_syscall(EXIT_SUCCESS); + // recieve message from server + let mut buf = sizecbuf(4); + let mut result: i32; + loop { + result = cage4.recv_syscall(clientsockfd_unix, buf.as_mut_ptr(), 4, 0); + if result != -libc::EINTR { + break; // if the error was EINTR, retry the syscall + } + } + assert_eq!(result, 4); + assert_eq!(cbuf2str(&buf), "test"); + + assert_eq!(cage4.close_syscall(clientsockfd_unix), 0); + cage4.exit_syscall(EXIT_SUCCESS); }); - //acting as the server and processing the request - let thread3 = interface::helper_thread(move || { - let mut infds: Vec; - let mut outfds: Vec; - for _counter in 0..600 { - //start a while true loop for processing requests - let pollretvalue = cage.poll_syscall( - &mut polled.as_mut_slice(), - Some(interface::RustDuration::ZERO), - ); - assert!(pollretvalue >= 0); + let thread_pipe = interface::helper_thread(move || { + let cage5 = interface::cagetable_getref(5); - infds = vec![]; - outfds = vec![]; + interface::sleep(interface::RustDuration::from_millis(1)); + // send message to pipe + assert_eq!(cage5.write_syscall(pipefds.writefd, str2cbuf("test"), 4), 4); - for polledfile in &mut polled { - if polledfile.revents & POLLIN != 0 { - infds.push(polledfile.fd); - } - if polledfile.revents & POLLOUT != 0 { - outfds.push(polledfile.fd); - } - } + let mut buf = sizecbuf(5); + // wait until peer read the message + barrier_pipe_clone.wait(); + + // read the message sent by peer + assert_eq!(cage5.read_syscall(pipefds.readfd, buf.as_mut_ptr(), 5), 5); + assert_eq!(cbuf2str(&buf), "test2"); + + barrier_exit_clone3.wait(); + cage5.exit_syscall(EXIT_SUCCESS); + }); + + barrier.wait(); + // acting as the server and processing the request + // Server loop to handle connections and I/O + // Check for any activity in any of the Input sockets + for counter in 0..600 { + let poll_result = cage.poll_syscall(&mut polled.as_mut_slice(), None); + assert!(poll_result >= 0); // check for error + + // clearfds stores the fds that should be removed from polled at the end of the + // iteration + let mut clearfds = vec![]; + // addfds stores the fds that should be added to polled at the end of the + // iteration + let mut addfds = vec![]; - //check for any activity in the input sockets - for sockfd in infds { - //If the socket returned was listerner socket, then there's a new connection - //so we accept it, and put the client socket in the list of inputs. - if sockfd == serversockfd { - let port: u16 = generate_random_port(); - let sockaddr = interface::SockaddrV4 { - sin_family: AF_INET as u16, - sin_port: port.to_be(), - sin_addr: interface::V4Addr { - s_addr: u32::from_ne_bytes([127, 0, 0, 1]), - }, - padding: 0, - }; - let mut addr = interface::GenSockaddr::V4(sockaddr); //127.0.0.1 from bytes above - - let newsockfd = cage.accept_syscall(sockfd, &mut addr); - polled.push(interface::PollStruct { - fd: newsockfd, - events: POLLIN, + // check for readfds + for poll in &mut polled { + // If the socket returned was listerner socket, then there's a new conn., so we + // accept it, and put the client socket in the list of Inputs. + if poll.fd == serversockfd { + if poll.revents & POLLIN != 0 { + let mut sockgarbage = + interface::GenSockaddr::V4(interface::SockaddrV4::default()); + let sockfd = cage.accept_syscall(poll.fd as i32, &mut sockgarbage); + assert!(sockfd > 0); + // new connection is estalished, add it to readfds and writefds + + addfds.push(interface::PollStruct { + fd: sockfd, + events: POLLIN | POLLOUT, revents: 0, - }) - } else if sockfd == filefd { - //Write to a file... - assert_eq!(cage.write_syscall(sockfd, str2cbuf("test"), 4), 4); - assert_eq!(cage.lseek_syscall(sockfd, 0, SEEK_SET), 0); - //Once the write is successful into a file, modify the file descriptor so - // that its ready for reading out of the file. - for polledfile in &mut polled { - if polledfile.fd == sockfd { - polledfile.events = POLLOUT; - break; - } - } - } else { + }); + } + } else if poll.fd == filefd { + // poll on regular file should always success + // therefore revents should be set for filefd at the first iteration + assert_eq!(counter, 0); + assert_eq!(poll.revents, POLLIN | POLLOUT); + // remove file fd from poll + clearfds.push(filefd); + } else if poll.fd == serversockfd_unix { + if poll.revents & POLLIN != 0 { + // unix socket + let mut sockgarbage = interface::GenSockaddr::Unix( + interface::new_sockaddr_unix(AF_UNIX as u16, "".as_bytes()), + ); + let sockfd = cage.accept_syscall(poll.fd as i32, &mut sockgarbage); + assert!(sockfd > 0); + // new connection is estalished, add it to poll + addfds.push(interface::PollStruct { + fd: sockfd, + events: POLLIN | POLLOUT, + revents: 0, + }); + } + } else if poll.fd == pipefds.readfd { + if poll.revents & POLLIN != 0 { + // pipe + let mut buf = sizecbuf(4); + // read the message from peer + assert_eq!(cage.read_syscall(pipefds.readfd, buf.as_mut_ptr(), 4), 4); + assert_eq!(cbuf2str(&buf), "test"); + + // write the message from peer + assert_eq!( + cage.write_syscall(pipefds.writefd, str2cbuf("test2"), 5) as usize, + 5 + ); + barrier_pipe.wait(); + + // pipe poll test done + clearfds.push(pipefds.readfd); + } + } else { + if poll.revents & POLLIN != 0 { //If the socket is in established conn., then we recv the data. If there's // no data, then close the client socket. let mut buf = sizecbuf(4); - let mut result: i32; + let mut recvresult: i32; loop { - result = cage.recv_syscall(sockfd, buf.as_mut_ptr(), 4, 0); - if result != -libc::EINTR { - assert_eq!(result & !4, 0); //This must be 0 or 4 to be correct, either the socket is good for - // recieving or it's closed + // receive message from peer + recvresult = cage.recv_syscall(poll.fd as i32, buf.as_mut_ptr(), 4, 0); + if recvresult != -libc::EINTR { break; // if the error was EINTR, retry the // syscall } } - if result == 4 { - assert_eq!(cbuf2str(&buf), "test"); - //This socket is ready for writing, modify the socket descriptor to be - // in read-write mode. This socket can write data out to network - for polledfile in &mut polled { - if polledfile.fd == sockfd { - polledfile.events = POLLOUT; - break; - } + if recvresult == 4 { + if cbuf2str(&buf) == "test" { + continue; } - } else { - //No data means remote socket closed, hence close the client socket in - // server, also remove this socket from polling. - assert_eq!(cage.close_syscall(sockfd), 0); - polled.retain(|x| x.fd != sockfd); + } else if recvresult == -libc::ECONNRESET { + // peer closed the connection + println!("Connection reset by peer on socket {}", poll.fd); + assert_eq!(cage.close_syscall(poll.fd as i32), 0); + clearfds.push(poll.fd); } } - } - - for sockfd in outfds { - if sockfd == filefd { - let mut read_buf1 = sizecbuf(4); - assert_eq!(cage.read_syscall(sockfd, read_buf1.as_mut_ptr(), 4), 4); - assert_eq!(cbuf2str(&read_buf1), "test"); - //test for file finished, remove from polling. - polled.retain(|x| x.fd != sockfd); - } else { - //Data is sent out of this socket, it's no longer ready for writing, modify - // it only read mode. - assert_eq!(cage.send_syscall(sockfd, str2cbuf(&"test"), 4, 0), 4); - for polledfile in &mut polled { - if polledfile.fd == sockfd { - polledfile.events = POLLIN; - } - } + if poll.revents & POLLOUT != 0 { + // Data is sent out this socket, it's no longer ready for writing + // clear the POLLOUT from events + assert_eq!(cage.send_syscall(poll.fd as i32, str2cbuf("test"), 4, 0), 4); + poll.events &= !POLLOUT; } } } - assert_eq!(cage.close_syscall(serversockfd), 0); - assert_eq!(cage.exit_syscall(EXIT_SUCCESS), EXIT_SUCCESS); - }); + // clear fds + polled.retain(|x| { + for fd in &clearfds { + if *fd == x.fd { + return false; + } + } + return true; + }); + // add new fds + polled.extend(addfds); + } + assert_eq!(cage.close_syscall(serversockfd), 0); + assert_eq!(cage.close_syscall(serversockfd_unix), 0); - thread1.join().unwrap(); - thread2.join().unwrap(); - thread3.join().unwrap(); + // let threads exit + barrier_exit.wait(); + threadclient1.join().unwrap(); + threadclient2.join().unwrap(); + threadclient_unix.join().unwrap(); + thread_pipe.join().unwrap(); + + assert_eq!(cage.exit_syscall(EXIT_SUCCESS), EXIT_SUCCESS); lindrustfinalize(); }