changeset 1074:10f198d4a308

Make main socket nonblocking. Limit writequeue size.
author Matt Johnston <matt@ucc.asn.au>
date Fri, 20 Mar 2015 23:36:42 +0800
parents 88043f9d40bd
children cc6116cc0b5c
files channel.h common-channel.c common-session.c packet.c packet.h session.h
diffstat 6 files changed, 41 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/channel.h	Fri Mar 20 23:33:45 2015 +0800
+++ b/channel.h	Fri Mar 20 23:36:42 2015 +0800
@@ -106,7 +106,7 @@
 
 void chaninitialise(const struct ChanType *chantypes[]);
 void chancleanup();
-void setchannelfds(fd_set *readfd, fd_set *writefd);
+void setchannelfds(fd_set *readfds, fd_set *writefds, int allow_reads);
 void channelio(fd_set *readfd, fd_set *writefd);
 struct Channel* getchannel();
 /* Returns an arbitrary channel that is in a ready state - not
--- a/common-channel.c	Fri Mar 20 23:33:45 2015 +0800
+++ b/common-channel.c	Fri Mar 20 23:36:42 2015 +0800
@@ -531,7 +531,7 @@
 
 /* Set the file descriptors for the main select in session.c
  * This avoid channels which don't have any window available, are closed, etc*/
-void setchannelfds(fd_set *readfds, fd_set *writefds) {
+void setchannelfds(fd_set *readfds, fd_set *writefds, int allow_reads) {
 	
 	unsigned int i;
 	struct Channel * channel;
@@ -549,7 +549,7 @@
 		FD if there's the possibility of "~."" to kill an 
 		interactive session (the read_mangler) */
 		if (channel->transwindow > 0
-		   && (ses.dataallowed || channel->read_mangler)) {
+		   && ((ses.dataallowed && allow_reads) || channel->read_mangler)) {
 
 			if (channel->readfd >= 0) {
 				FD_SET(channel->readfd, readfds);
--- a/common-session.c	Fri Mar 20 23:33:45 2015 +0800
+++ b/common-session.c	Fri Mar 20 23:36:42 2015 +0800
@@ -64,6 +64,13 @@
 	ses.sock_out = sock_out;
 	ses.maxfd = MAX(sock_in, sock_out);
 
+	if (sock_in >= 0) {
+		setnonblocking(sock_in);
+	}
+	if (sock_out >= 0) {
+		setnonblocking(sock_out);
+	}
+
 	ses.socket_prio = DROPBEAR_PRIO_DEFAULT;
 	/* Sets it to lowdelay */
 	update_channel_prio();
@@ -145,6 +152,7 @@
 
 	/* main loop, select()s for all sockets in use */
 	for(;;) {
+		const int writequeue_has_space = (ses.writequeue_len <= 2*TRANS_MAX_PAYLOAD_LEN);
 
 		timeout.tv_sec = select_timeout();
 		timeout.tv_usec = 0;
@@ -155,8 +163,12 @@
 		/* We delay reading from the input socket during initial setup until
 		after we have written out our initial KEXINIT packet (empty writequeue). 
 		This means our initial packet can be in-flight while we're doing a blocking
-		read for the remote ident */
-		if (ses.sock_in != -1 && (ses.remoteident || isempty(&ses.writequeue))) {
+		read for the remote ident.
+		We also avoid reading from the socket if the writequeue is full, that avoids
+		replies backing up */
+		if (ses.sock_in != -1 
+			&& (ses.remoteident || isempty(&ses.writequeue)) 
+			&& writequeue_has_space) {
 			FD_SET(ses.sock_in, &readfd);
 		}
 		if (ses.sock_out != -1 && !isempty(&ses.writequeue)) {
@@ -168,7 +180,7 @@
 		FD_SET(ses.signal_pipe[0], &readfd);
 
 		/* set up for channels which can be read/written */
-		setchannelfds(&readfd, &writefd);
+		setchannelfds(&readfd, &writefd, writequeue_has_space);
 
 		/* Pending connections to test */
 		set_connect_fds(&writefd);
@@ -318,9 +330,7 @@
 void send_session_identification() {
 	buffer *writebuf = buf_new(strlen(LOCAL_IDENT "\r\n") + 1);
 	buf_putbytes(writebuf, LOCAL_IDENT "\r\n", strlen(LOCAL_IDENT "\r\n"));
-	buf_putbyte(writebuf, 0x0); /* packet type */
-	buf_setpos(writebuf, 0);
-	enqueue(&ses.writequeue, writebuf);
+	writebuf_enqueue(writebuf, 0);
 }
 
 static void read_session_identification() {
--- a/packet.c	Fri Mar 20 23:33:45 2015 +0800
+++ b/packet.c	Fri Mar 20 23:36:42 2015 +0800
@@ -59,7 +59,7 @@
 	ssize_t written;
 #ifdef HAVE_WRITEV
 	/* 50 is somewhat arbitrary */
-	int iov_count = 50;
+	unsigned int iov_count = 50;
 	struct iovec iov[50];
 #endif
 	
@@ -83,6 +83,7 @@
 	}
 
 	packet_queue_consume(&ses.writequeue, written);
+	ses.writequeue_len -= written;
 
 	if (written == 0) {
 		ses.remoteclosed();
@@ -113,6 +114,8 @@
 		ses.remoteclosed();
 	}
 
+	ses.writequeue_len -= written;
+
 	if (written == len) {
 		/* We've finished with the packet, free it */
 		dequeue(&ses.writequeue);
@@ -570,15 +573,12 @@
     /* stick the MAC on it */
     buf_putbytes(writebuf, mac_bytes, mac_size);
 
-	/* The last byte of the buffer stores the cleartext packet_type. It is not
-	 * transmitted but is used for transmit timeout purposes */
-	buf_putbyte(writebuf, packet_type);
-	/* enqueue the packet for sending. It will get freed after transmission. */
-	buf_setpos(writebuf, 0);
-	enqueue(&ses.writequeue, (void*)writebuf);
+	/* Update counts */
+	ses.kexstate.datatrans += writebuf->len;
+
+	writebuf_enqueue(writebuf, packet_type);
 
 	/* Update counts */
-	ses.kexstate.datatrans += writebuf->len;
 	ses.transseq++;
 
 	now = monotonic_now();
@@ -596,6 +596,16 @@
 	TRACE2(("leave encrypt_packet()"))
 }
 
+void writebuf_enqueue(buffer * writebuf, unsigned char packet_type) {
+	/* The last byte of the buffer stores the cleartext packet_type. It is not
+	 * transmitted but is used for transmit timeout purposes */
+	buf_putbyte(writebuf, packet_type);
+	/* enqueue the packet for sending. It will get freed after transmission. */
+	buf_setpos(writebuf, 0);
+	enqueue(&ses.writequeue, (void*)writebuf);
+	ses.writequeue_len += writebuf->len-1;
+}
+
 
 /* Create the packet mac, and append H(seqno|clearbuf) to the output */
 /* output_mac must have ses.keys->trans.algo_mac->hashsize bytes. */
--- a/packet.h	Fri Mar 20 23:33:45 2015 +0800
+++ b/packet.h	Fri Mar 20 23:36:42 2015 +0800
@@ -28,12 +28,15 @@
 
 #include "includes.h"
 #include "queue.h"
+#include "buffer.h"
 
 void write_packet();
 void read_packet();
 void decrypt_packet();
 void encrypt_packet();
 
+void writebuf_enqueue(buffer * writebuf, unsigned char packet_type);
+
 void process_packet();
 
 void maybe_flush_reply_queue();
--- a/session.h	Fri Mar 20 23:33:45 2015 +0800
+++ b/session.h	Fri Mar 20 23:36:42 2015 +0800
@@ -125,6 +125,7 @@
 							 throughout the code, as handlers fill out this
 							 buffer with the packet to send. */
 	struct Queue writequeue; /* A queue of encrypted packets to send */
+	unsigned int writequeue_len; /* Number of bytes pending to send in writequeue */
 	buffer *readbuf; /* From the wire, decrypted in-place */
 	buffer *payload; /* Post-decompression, the actual SSH packet. 
 						May have extra data at the beginning, will be