No, it's a bug in either the pipe code or the CPU scheduler I'd say.
You could try backing out to the 2.5.40 pipe implementation; not sure if
that would tell us much though.
Here's a backout patch:
 fs/pipe.c                 |  323 +++++++++++++++++++++++++---------------------
 include/linux/pipe_fs_i.h |    2 
 2 files changed, 183 insertions(+), 142 deletions(-)
--- 2.5.43/include/linux/pipe_fs_i.h~pipe-backout	Thu Oct 17 00:28:13 2002
+++ 2.5.43-akpm/include/linux/pipe_fs_i.h	Thu Oct 17 00:28:25 2002
@@ -9,6 +9,7 @@ struct pipe_inode_info {
 	unsigned int start;
 	unsigned int readers;
 	unsigned int writers;
+	unsigned int waiting_readers;
 	unsigned int waiting_writers;
 	unsigned int r_counter;
 	unsigned int w_counter;
@@ -27,6 +28,7 @@ struct pipe_inode_info {
 #define PIPE_LEN(inode)		((inode).i_pipe->len)
 #define PIPE_READERS(inode)	((inode).i_pipe->readers)
 #define PIPE_WRITERS(inode)	((inode).i_pipe->writers)
+#define PIPE_WAITING_READERS(inode)	((inode).i_pipe->waiting_readers)
 #define PIPE_WAITING_WRITERS(inode)	((inode).i_pipe->waiting_writers)
 #define PIPE_RCOUNTER(inode)	((inode).i_pipe->r_counter)
 #define PIPE_WCOUNTER(inode)	((inode).i_pipe->w_counter)
--- 2.5.43/fs/pipe.c~pipe-backout	Thu Oct 17 00:28:16 2002
+++ 2.5.43-akpm/fs/pipe.c	Thu Oct 17 00:28:20 2002
@@ -25,9 +25,6 @@
  *
  * FIFOs and Pipes now generate SIGIO for both readers and writers.
  * -- Jeremy Elson <jelson@circlemud.org> 2001-08-16
- *
- * pipe_read & write cleanup
- * -- Manfred Spraul <manfred@colorfullife.com> 2002-05-09
  */
 
 /* Drop the inode semaphore and wait for a pipe event, atomically */
@@ -47,81 +44,97 @@ static ssize_t
 pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
-	int do_wakeup;
-	ssize_t ret;
+	ssize_t size, read, ret;
 
-	/* pread is not allowed on pipes. */
-	if (unlikely(ppos != &filp->f_pos))
-		return -ESPIPE;
-	
-	/* Null read succeeds. */
-	if (unlikely(count == 0))
-		return 0;
+	/* Seeks are not allowed on pipes.  */
+	ret = -ESPIPE;
+	read = 0;
+	if (ppos != &filp->f_pos)
+		goto out_nolock;
 
-	do_wakeup = 0;
+	/* Always return 0 on null read.  */
 	ret = 0;
-	down(PIPE_SEM(*inode));
-	for (;;) {
-		int size = PIPE_LEN(*inode);
-		if (size) {
-			char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
-			ssize_t chars = PIPE_MAX_RCHUNK(*inode);
-
-			if (chars > count)
-				chars = count;
-			if (chars > size)
-				chars = size;
-
-			if (copy_to_user(buf, pipebuf, chars)) {
-				if (!ret) ret = -EFAULT;
-				break;
-			}
-			ret += chars;
+	if (count == 0)
+		goto out_nolock;
 
-			PIPE_START(*inode) += chars;
-			PIPE_START(*inode) &= (PIPE_SIZE - 1);
-			PIPE_LEN(*inode) -= chars;
-			count -= chars;
-			buf += chars;
-			do_wakeup = 1;
-		}
-		if (!count)
-			break;	/* common path: read succeeded */
-		if (PIPE_LEN(*inode)) /* test for cyclic buffers */
-			continue;
+	/* Get the pipe semaphore */
+	ret = -ERESTARTSYS;
+	if (down_interruptible(PIPE_SEM(*inode)))
+		goto out_nolock;
+
+	if (PIPE_EMPTY(*inode)) {
+do_more_read:
+		ret = 0;
 		if (!PIPE_WRITERS(*inode))
-			break;
-		if (!PIPE_WAITING_WRITERS(*inode)) {
-			/* syscall merging: Usually we must not sleep
-			 * if O_NONBLOCK is set, or if we got some data.
-			 * But if a writer sleeps in kernel space, then
-			 * we can wait for that data without violating POSIX.
-			 */
-			if (ret)
-				break;
-			if (filp->f_flags & O_NONBLOCK) {
-				ret = -EAGAIN;
+			goto out;
+
+		ret = -EAGAIN;
+		if (filp->f_flags & O_NONBLOCK)
+			goto out;
+
+		for (;;) {
+			PIPE_WAITING_READERS(*inode)++;
+			pipe_wait(inode);
+			PIPE_WAITING_READERS(*inode)--;
+			ret = -ERESTARTSYS;
+			if (signal_pending(current))
+				goto out;
+			ret = 0;
+			if (!PIPE_EMPTY(*inode))
 				break;
-			}
+			if (!PIPE_WRITERS(*inode))
+				goto out;
 		}
-		if (signal_pending(current)) {
-			if (!ret) ret = -ERESTARTSYS;
-			break;
-		}
-		if (do_wakeup) {
-			wake_up_interruptible(PIPE_WAIT(*inode));
- 			kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
-		}
-		pipe_wait(inode);
 	}
-	up(PIPE_SEM(*inode));
-	/* Signal writers asynchronously that there is more room.  */
-	if (do_wakeup) {
+
+	/* Read what data is available.  */
+	ret = -EFAULT;
+	while (count > 0 && (size = PIPE_LEN(*inode))) {
+		char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
+		ssize_t chars = PIPE_MAX_RCHUNK(*inode);
+
+		if (chars > count)
+			chars = count;
+		if (chars > size)
+			chars = size;
+
+		if (copy_to_user(buf, pipebuf, chars))
+			goto out;
+
+		read += chars;
+		PIPE_START(*inode) += chars;
+		PIPE_START(*inode) &= (PIPE_SIZE - 1);
+		PIPE_LEN(*inode) -= chars;
+		count -= chars;
+		buf += chars;
+	}
+
+	/* Cache behaviour optimization */
+	if (!PIPE_LEN(*inode))
+		PIPE_START(*inode) = 0;
+
+	if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {
+		/*
+		 * We know that we are going to sleep: signal
+		 * writers synchronously that there is more
+		 * room.
+		 */
 		wake_up_interruptible_sync(PIPE_WAIT(*inode));
-		kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
+ 		kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
+		if (!PIPE_EMPTY(*inode))
+			BUG();
+		goto do_more_read;
 	}
-	if (ret > 0)
-		UPDATE_ATIME(inode);
+	/* Signal writers asynchronously that there is more room.  */
+	wake_up_interruptible(PIPE_WAIT(*inode));
+ 	kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
+
+	ret = read;
+out:
+	up(PIPE_SEM(*inode));
+out_nolock:
+	if (read)
+		ret = read;
 	return ret;
 }
 
@@ -129,90 +142,116 @@ static ssize_t
 pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
-	ssize_t ret;
-	size_t min;
-	int do_wakeup;
-
-	/* pwrite is not allowed on pipes. */
-	if (unlikely(ppos != &filp->f_pos))
-		return -ESPIPE;
-	
-	/* Null write succeeds. */
-	if (unlikely(count == 0))
-		return 0;
+	ssize_t free, written, ret;
 
-	do_wakeup = 0;
+	/* Seeks are not allowed on pipes.  */
+	ret = -ESPIPE;
+	written = 0;
+	if (ppos != &filp->f_pos)
+		goto out_nolock;
+
+	/* Null write succeeds.  */
 	ret = 0;
-	min = count;
-	if (min > PIPE_BUF)
-		min = 1;
-	down(PIPE_SEM(*inode));
-	for (;;) {
-		int free;
-		if (!PIPE_READERS(*inode)) {
-			send_sig(SIGPIPE, current, 0);
-			if (!ret) ret = -EPIPE;
-			break;
+	if (count == 0)
+		goto out_nolock;
+
+	ret = -ERESTARTSYS;
+	if (down_interruptible(PIPE_SEM(*inode)))
+		goto out_nolock;
+
+	/* No readers yields SIGPIPE.  */
+	if (!PIPE_READERS(*inode))
+		goto sigpipe;
+
+	/* If count <= PIPE_BUF, we have to make it atomic.  */
+	free = (count <= PIPE_BUF ? count : 1);
+
+	/* Wait, or check for, available space.  */
+	if (filp->f_flags & O_NONBLOCK) {
+		ret = -EAGAIN;
+		if (PIPE_FREE(*inode) < free)
+			goto out;
+	} else {
+		while (PIPE_FREE(*inode) < free) {
+			PIPE_WAITING_WRITERS(*inode)++;
+			pipe_wait(inode);
+			PIPE_WAITING_WRITERS(*inode)--;
+			ret = -ERESTARTSYS;
+			if (signal_pending(current))
+				goto out;
+
+			if (!PIPE_READERS(*inode))
+				goto sigpipe;
 		}
-		free = PIPE_FREE(*inode);
-		if (free >= min) {
-			/* transfer data */
-			ssize_t chars = PIPE_MAX_WCHUNK(*inode);
-			char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
-			/* Always wakeup, even if the copy fails. Otherwise
-			 * we lock up (O_NONBLOCK-)readers that sleep due to
-			 * syscall merging.
-			 */
-			do_wakeup = 1;
+	}
+
+	/* Copy into available space.  */
+	ret = -EFAULT;
+	while (count > 0) {
+		int space;
+		char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
+		ssize_t chars = PIPE_MAX_WCHUNK(*inode);
+
+		if ((space = PIPE_FREE(*inode)) != 0) {
 			if (chars > count)
 				chars = count;
-			if (chars > free)
-				chars = free;
+			if (chars > space)
+				chars = space;
 
-			if (copy_from_user(pipebuf, buf, chars)) {
-				if (!ret) ret = -EFAULT;
-				break;
-			}
+			if (copy_from_user(pipebuf, buf, chars))
+				goto out;
 
-			ret += chars;
+			written += chars;
 			PIPE_LEN(*inode) += chars;
 			count -= chars;
 			buf += chars;
-		}
-		if (!count)
-			break;
-		if (PIPE_FREE(*inode) && ret) {
-			/* handle cyclic data buffers */
-			min = 1;
+			space = PIPE_FREE(*inode);
 			continue;
 		}
-		if (filp->f_flags & O_NONBLOCK) {
-			if (!ret) ret = -EAGAIN;
-			break;
-		}
-		if (signal_pending(current)) {
-			if (!ret) ret = -ERESTARTSYS;
+
+		ret = written;
+		if (filp->f_flags & O_NONBLOCK)
 			break;
-		}
-		if (do_wakeup) {
+
+		do {
+			/*
+			 * Synchronous wake-up: it knows that this process
+			 * is going to give up this CPU, so it doesn't have
+			 * to do idle reschedules.
+			 */
 			wake_up_interruptible_sync(PIPE_WAIT(*inode));
-			kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
-			do_wakeup = 0;
-		}
-		PIPE_WAITING_WRITERS(*inode)++;
-		pipe_wait(inode);
-		PIPE_WAITING_WRITERS(*inode)--;
+ 			kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
+			PIPE_WAITING_WRITERS(*inode)++;
+			pipe_wait(inode);
+			PIPE_WAITING_WRITERS(*inode)--;
+			if (signal_pending(current))
+				goto out;
+			if (!PIPE_READERS(*inode))
+				goto sigpipe;
+		} while (!PIPE_FREE(*inode));
+		ret = -EFAULT;
 	}
+
+	/* Signal readers asynchronously that there is more data.  */
+	wake_up_interruptible(PIPE_WAIT(*inode));
+	kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
+
+	inode->i_ctime = inode->i_mtime = CURRENT_TIME;
+	mark_inode_dirty(inode);
+
+out:
 	up(PIPE_SEM(*inode));
-	if (do_wakeup) {
-		wake_up_interruptible(PIPE_WAIT(*inode));
-		kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
-	}
-	if (ret > 0) {
-		inode->i_ctime = inode->i_mtime = CURRENT_TIME;
-		mark_inode_dirty(inode);
-	}
+out_nolock:
+	if (written)
+		ret = written;
 	return ret;
+
+sigpipe:
+	if (written)
+		goto out;
+	up(PIPE_SEM(*inode));
+	send_sig(SIGPIPE, current, 0);
+	return -EPIPE;
 }
 
 static ssize_t
@@ -412,7 +451,7 @@ struct file_operations read_fifo_fops = 
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_read_open,
 	.release	= pipe_read_release,
-	.fasync		= pipe_read_fasync,
+	.fasync         = pipe_read_fasync,
 };
 
 struct file_operations write_fifo_fops = {
@@ -423,7 +462,7 @@ struct file_operations write_fifo_fops =
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_write_open,
 	.release	= pipe_write_release,
-	.fasync		= pipe_write_fasync,
+	.fasync         = pipe_write_fasync,
 };
 
 struct file_operations rdwr_fifo_fops = {
@@ -434,7 +473,7 @@ struct file_operations rdwr_fifo_fops = 
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_rdwr_open,
 	.release	= pipe_rdwr_release,
-	.fasync		= pipe_rdwr_fasync,
+	.fasync         = pipe_rdwr_fasync,
 };
 
 struct file_operations read_pipe_fops = {
@@ -445,7 +484,7 @@ struct file_operations read_pipe_fops = 
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_read_open,
 	.release	= pipe_read_release,
-	.fasync		= pipe_read_fasync,
+	.fasync         = pipe_read_fasync,
 };
 
 struct file_operations write_pipe_fops = {
@@ -456,7 +495,7 @@ struct file_operations write_pipe_fops =
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_write_open,
 	.release	= pipe_write_release,
-	.fasync		= pipe_write_fasync,
+	.fasync         = pipe_write_fasync,
 };
 
 struct file_operations rdwr_pipe_fops = {
@@ -467,7 +506,7 @@ struct file_operations rdwr_pipe_fops = 
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_rdwr_open,
 	.release	= pipe_rdwr_release,
-	.fasync		= pipe_rdwr_fasync,
+	.fasync         = pipe_rdwr_fasync,
 };
 
 struct inode* pipe_new(struct inode* inode)
@@ -486,7 +525,7 @@ struct inode* pipe_new(struct inode* ino
 	PIPE_BASE(*inode) = (char*) page;
 	PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
 	PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
-	PIPE_WAITING_WRITERS(*inode) = 0;
+	PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0;
 	PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
 	*PIPE_FASYNC_READERS(*inode) = *PIPE_FASYNC_WRITERS(*inode) = NULL;
 
.
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/