io.c

説明を見る。
00001 /* -*- c-file-style: "linux" -*-
00002  *
00003  * Copyright (C) 1996-2001 by Andrew Tridgell
00004  * Copyright (C) Paul Mackerras 1996
00005  * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
00006  *
00007  * This program is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 2 of the License, or
00010  * (at your option) any later version.
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with this program; if not, write to the Free Software
00019  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00020  */
00021 
00022 /**
00023  * @file io.c
00024  *
00025  * Socket and pipe I/O utilities used in rsync.
00026  *
00027  * rsync provides its own multiplexing system, which is used to send
00028  * stderr and stdout over a single socket.  We need this because
00029  * stdout normally carries the binary data stream, and stderr all our
00030  * error messages.
00031  *
00032  * For historical reasons this is off during the start of the
00033  * connection, but it's switched on quite early using
00034  * io_start_multiplex_out() and io_start_multiplex_in().
00035  **/
00036 
00037 #include "rsync.h"
00038 
00039 /** If no timeout is specified then use a 60 second select timeout */
00040 #define SELECT_TIMEOUT 60
00041 
00042 extern int bwlimit;
00043 extern size_t bwlimit_writemax;
00044 extern int io_timeout;
00045 extern int allowed_lull;
00046 extern int am_server;
00047 extern int am_daemon;
00048 extern int am_sender;
00049 extern int am_generator;
00050 extern int eol_nulls;
00051 extern int read_batch;
00052 extern int csum_length;
00053 extern int checksum_seed;
00054 extern int protocol_version;
00055 extern int remove_sent_files;
00056 extern int preserve_hard_links;
00057 extern char *filesfrom_host;
00058 extern struct stats stats;
00059 extern struct file_list *the_file_list;
00060 
00061 const char phase_unknown[] = "unknown";
00062 int ignore_timeout = 0;
00063 int batch_fd = -1;
00064 int batch_gen_fd = -1;
00065 
00066 /* Ignore an EOF error if non-zero. See whine_about_eof(). */
00067 int kluge_around_eof = 0;
00068 
00069 int msg_fd_in = -1;
00070 int msg_fd_out = -1;
00071 int sock_f_in = -1;
00072 int sock_f_out = -1;
00073 
00074 static int io_multiplexing_out;
00075 static int io_multiplexing_in;
00076 static time_t last_io_in;
00077 static time_t last_io_out;
00078 static int no_flush;
00079 
00080 static int write_batch_monitor_in = -1;
00081 static int write_batch_monitor_out = -1;
00082 
00083 static int io_filesfrom_f_in = -1;
00084 static int io_filesfrom_f_out = -1;
00085 static char io_filesfrom_buf[2048];
00086 static char *io_filesfrom_bp;
00087 static char io_filesfrom_lastchar;
00088 static int io_filesfrom_buflen;
00089 static int defer_forwarding_messages = 0;
00090 static int select_timeout = SELECT_TIMEOUT;
00091 static int active_filecnt = 0;
00092 static OFF_T active_bytecnt = 0;
00093 
00094 static void read_loop(int fd, char *buf, size_t len);
00095 
00096 struct flist_ndx_item {
00097         struct flist_ndx_item *next;
00098         int ndx;
00099 };
00100 
00101 struct flist_ndx_list {
00102         struct flist_ndx_item *head, *tail;
00103 };
00104 
00105 static struct flist_ndx_list redo_list, hlink_list;
00106 
00107 struct msg_list_item {
00108         struct msg_list_item *next;
00109         int len;
00110         char buf[1];
00111 };
00112 
00113 struct msg_list {
00114         struct msg_list_item *head, *tail;
00115 };
00116 
00117 static struct msg_list msg2genr, msg2sndr;
00118 
00119 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
00120 {
00121         struct flist_ndx_item *item;
00122 
00123         if (!(item = new(struct flist_ndx_item)))
00124                 out_of_memory("flist_ndx_push");
00125         item->next = NULL;
00126         item->ndx = ndx;
00127         if (lp->tail)
00128                 lp->tail->next = item;
00129         else
00130                 lp->head = item;
00131         lp->tail = item;
00132 }
00133 
00134 static int flist_ndx_pop(struct flist_ndx_list *lp)
00135 {
00136         struct flist_ndx_item *next;
00137         int ndx;
00138 
00139         if (!lp->head)
00140                 return -1;
00141 
00142         ndx = lp->head->ndx;
00143         next = lp->head->next;
00144         free(lp->head);
00145         lp->head = next;
00146         if (!next)
00147                 lp->tail = NULL;
00148 
00149         return ndx;
00150 }
00151 
00152 static void check_timeout(void)
00153 {
00154         time_t t;
00155 
00156         if (!io_timeout || ignore_timeout)
00157                 return;
00158 
00159         if (!last_io_in) {
00160                 last_io_in = time(NULL);
00161                 return;
00162         }
00163 
00164         t = time(NULL);
00165 
00166         if (t - last_io_in >= io_timeout) {
00167                 if (!am_server && !am_daemon) {
00168                         rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
00169                                 (int)(t-last_io_in));
00170                 }
00171                 exit_cleanup(RERR_TIMEOUT);
00172         }
00173 }
00174 
00175 /* Note the fds used for the main socket (which might really be a pipe
00176  * for a local transfer, but we can ignore that). */
00177 void io_set_sock_fds(int f_in, int f_out)
00178 {
00179         sock_f_in = f_in;
00180         sock_f_out = f_out;
00181 }
00182 
00183 void set_io_timeout(int secs)
00184 {
00185         io_timeout = secs;
00186 
00187         if (!io_timeout || io_timeout > SELECT_TIMEOUT)
00188                 select_timeout = SELECT_TIMEOUT;
00189         else
00190                 select_timeout = io_timeout;
00191 
00192         allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
00193 }
00194 
00195 /* Setup the fd used to receive MSG_* messages.  Only needed during the
00196  * early stages of being a local sender (up through the sending of the
00197  * file list) or when we're the generator (to fetch the messages from
00198  * the receiver). */
00199 void set_msg_fd_in(int fd)
00200 {
00201         msg_fd_in = fd;
00202 }
00203 
00204 /* Setup the fd used to send our MSG_* messages.  Only needed when
00205  * we're the receiver (to send our messages to the generator). */
00206 void set_msg_fd_out(int fd)
00207 {
00208         msg_fd_out = fd;
00209         set_nonblocking(msg_fd_out);
00210 }
00211 
00212 /* Add a message to the pending MSG_* list. */
00213 static void msg_list_add(struct msg_list *lst, int code, char *buf, int len)
00214 {
00215         struct msg_list_item *m;
00216         int sz = len + 4 + sizeof m[0] - 1;
00217 
00218         if (!(m = (struct msg_list_item *)new_array(char, sz)))
00219                 out_of_memory("msg_list_add");
00220         m->next = NULL;
00221         m->len = len + 4;
00222         SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
00223         memcpy(m->buf + 4, buf, len);
00224         if (lst->tail)
00225                 lst->tail->next = m;
00226         else
00227                 lst->head = m;
00228         lst->tail = m;
00229 }
00230 
00231 /* Read a message from the MSG_* fd and handle it.  This is called either
00232  * during the early stages of being a local sender (up through the sending
00233  * of the file list) or when we're the generator (to fetch the messages
00234  * from the receiver). */
00235 static void read_msg_fd(void)
00236 {
00237         char buf[2048];
00238         size_t n;
00239         int fd = msg_fd_in;
00240         int tag, len;
00241 
00242         /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
00243          * to this routine from writefd_unbuffered(). */
00244         msg_fd_in = -1;
00245 
00246         read_loop(fd, buf, 4);
00247         tag = IVAL(buf, 0);
00248 
00249         len = tag & 0xFFFFFF;
00250         tag = (tag >> 24) - MPLEX_BASE;
00251 
00252         switch (tag) {
00253         case MSG_DONE:
00254                 if (len != 0 || !am_generator) {
00255                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00256                         exit_cleanup(RERR_STREAMIO);
00257                 }
00258                 flist_ndx_push(&redo_list, -1);
00259                 break;
00260         case MSG_REDO:
00261                 if (len != 4 || !am_generator) {
00262                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00263                         exit_cleanup(RERR_STREAMIO);
00264                 }
00265                 read_loop(fd, buf, 4);
00266                 if (remove_sent_files)
00267                         decrement_active_files(IVAL(buf,0));
00268                 flist_ndx_push(&redo_list, IVAL(buf,0));
00269                 break;
00270         case MSG_DELETED:
00271                 if (len >= (int)sizeof buf || !am_generator) {
00272                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00273                         exit_cleanup(RERR_STREAMIO);
00274                 }
00275                 read_loop(fd, buf, len);
00276                 if (defer_forwarding_messages)
00277                         msg_list_add(&msg2sndr, MSG_DELETED, buf, len);
00278                 else
00279                         io_multiplex_write(MSG_DELETED, buf, len);
00280                 break;
00281         case MSG_SUCCESS:
00282                 if (len != 4 || !am_generator) {
00283                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00284                         exit_cleanup(RERR_STREAMIO);
00285                 }
00286                 read_loop(fd, buf, len);
00287                 if (remove_sent_files) {
00288                         decrement_active_files(IVAL(buf,0));
00289                         if (defer_forwarding_messages)
00290                                 msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len);
00291                         else
00292                                 io_multiplex_write(MSG_SUCCESS, buf, len);
00293                 }
00294                 if (preserve_hard_links)
00295                         flist_ndx_push(&hlink_list, IVAL(buf,0));
00296                 break;
00297         case MSG_SOCKERR:
00298                 if (!am_generator) {
00299                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00300                         exit_cleanup(RERR_STREAMIO);
00301                 }
00302                 close_multiplexing_out();
00303                 /* FALL THROUGH */
00304         case MSG_INFO:
00305         case MSG_ERROR:
00306         case MSG_LOG:
00307                 while (len) {
00308                         n = len;
00309                         if (n >= sizeof buf)
00310                                 n = sizeof buf - 1;
00311                         read_loop(fd, buf, n);
00312                         if (am_generator && am_server && defer_forwarding_messages)
00313                                 msg_list_add(&msg2sndr, tag, buf, n);
00314                         else
00315                                 rwrite((enum logcode)tag, buf, n);
00316                         len -= n;
00317                 }
00318                 break;
00319         default:
00320                 rprintf(FERROR, "unknown message %d:%d [%s]\n",
00321                         tag, len, who_am_i());
00322                 exit_cleanup(RERR_STREAMIO);
00323         }
00324 
00325         msg_fd_in = fd;
00326 }
00327 
00328 /* This is used by the generator to limit how many file transfers can
00329  * be active at once when --remove-sent-files is specified.  Without
00330  * this, sender-side deletions were mostly happening at the end. */
00331 void increment_active_files(int ndx, int itemizing, enum logcode code)
00332 {
00333         /* TODO: tune these limits? */
00334         while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
00335                 if (hlink_list.head)
00336                         check_for_finished_hlinks(itemizing, code);
00337                 read_msg_fd();
00338         }
00339 
00340         active_filecnt++;
00341         active_bytecnt += the_file_list->files[ndx]->length;
00342 }
00343 
00344 void decrement_active_files(int ndx)
00345 {
00346         active_filecnt--;
00347         active_bytecnt -= the_file_list->files[ndx]->length;
00348 }
00349 
00350 /* Try to push messages off the list onto the wire.  If we leave with more
00351  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
00352  * This is only active in the receiver. */
00353 static int msg2genr_flush(int flush_it_all)
00354 {
00355         static int written = 0;
00356         struct timeval tv;
00357         fd_set fds;
00358 
00359         if (msg_fd_out < 0)
00360                 return -1;
00361 
00362         while (msg2genr.head) {
00363                 struct msg_list_item *m = msg2genr.head;
00364                 int n = write(msg_fd_out, m->buf + written, m->len - written);
00365                 if (n < 0) {
00366                         if (errno == EINTR)
00367                                 continue;
00368                         if (errno != EWOULDBLOCK && errno != EAGAIN)
00369                                 return -1;
00370                         if (!flush_it_all)
00371                                 return 0;
00372                         FD_ZERO(&fds);
00373                         FD_SET(msg_fd_out, &fds);
00374                         tv.tv_sec = select_timeout;
00375                         tv.tv_usec = 0;
00376                         if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
00377                                 check_timeout();
00378                 } else if ((written += n) == m->len) {
00379                         msg2genr.head = m->next;
00380                         if (!msg2genr.head)
00381                                 msg2genr.tail = NULL;
00382                         free(m);
00383                         written = 0;
00384                 }
00385         }
00386         return 1;
00387 }
00388 
00389 void send_msg(enum msgcode code, char *buf, int len)
00390 {
00391         if (msg_fd_out < 0) {
00392                 io_multiplex_write(code, buf, len);
00393                 return;
00394         }
00395         msg_list_add(&msg2genr, code, buf, len);
00396         msg2genr_flush(NORMAL_FLUSH);
00397 }
00398 
00399 int get_redo_num(int itemizing, enum logcode code)
00400 {
00401         while (1) {
00402                 if (hlink_list.head)
00403                         check_for_finished_hlinks(itemizing, code);
00404                 if (redo_list.head)
00405                         break;
00406                 read_msg_fd();
00407         }
00408 
00409         return flist_ndx_pop(&redo_list);
00410 }
00411 
00412 int get_hlink_num(void)
00413 {
00414         return flist_ndx_pop(&hlink_list);
00415 }
00416 
00417 /**
00418  * When we're the receiver and we have a local --files-from list of names
00419  * that needs to be sent over the socket to the sender, we have to do two
00420  * things at the same time: send the sender a list of what files we're
00421  * processing and read the incoming file+info list from the sender.  We do
00422  * this by augmenting the read_timeout() function to copy this data.  It
00423  * uses the io_filesfrom_buf to read a block of data from f_in (when it is
00424  * ready, since it might be a pipe) and then blast it out f_out (when it
00425  * is ready to receive more data).
00426  */
00427 void io_set_filesfrom_fds(int f_in, int f_out)
00428 {
00429         io_filesfrom_f_in = f_in;
00430         io_filesfrom_f_out = f_out;
00431         io_filesfrom_bp = io_filesfrom_buf;
00432         io_filesfrom_lastchar = '\0';
00433         io_filesfrom_buflen = 0;
00434 }
00435 
00436 /* It's almost always an error to get an EOF when we're trying to read from the
00437  * network, because the protocol is (for the most part) self-terminating.
00438  *
00439  * There is one case for the receiver when it is at the end of the transfer
00440  * (hanging around reading any keep-alive packets that might come its way): if
00441  * the sender dies before the generator's kill-signal comes through, we can end
00442  * up here needing to loop until the kill-signal arrives.  In this situation,
00443  * kluge_around_eof will be < 0.
00444  *
00445  * There is another case for older protocol versions (< 24) where the module
00446  * listing was not terminated, so we must ignore an EOF error in that case and
00447  * exit.  In this situation, kluge_around_eof will be > 0. */
00448 static void whine_about_eof(int fd)
00449 {
00450         if (kluge_around_eof && fd == sock_f_in) {
00451                 int i;
00452                 if (kluge_around_eof > 0)
00453                         exit_cleanup(0);
00454                 /* If we're still here after 10 seconds, exit with an error. */
00455                 for (i = 10*1000/20; i--; )
00456                         msleep(20);
00457         }
00458 
00459         rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
00460                 "(%.0f bytes received so far) [%s]\n",
00461                 (double)stats.total_read, who_am_i());
00462 
00463         exit_cleanup(RERR_STREAMIO);
00464 }
00465 
00466 /**
00467  * Read from a socket with I/O timeout. return the number of bytes
00468  * read. If no bytes can be read then exit, never return a number <= 0.
00469  *
00470  * TODO: If the remote shell connection fails, then current versions
00471  * actually report an "unexpected EOF" error here.  Since it's a
00472  * fairly common mistake to try to use rsh when ssh is required, we
00473  * should trap that: if we fail to read any data at all, we should
00474  * give a better explanation.  We can tell whether the connection has
00475  * started by looking e.g. at whether the remote version is known yet.
00476  */
00477 static int read_timeout(int fd, char *buf, size_t len)
00478 {
00479         int n, cnt = 0;
00480 
00481         io_flush(NORMAL_FLUSH);
00482 
00483         while (cnt == 0) {
00484                 /* until we manage to read *something* */
00485                 fd_set r_fds, w_fds;
00486                 struct timeval tv;
00487                 int maxfd = fd;
00488                 int count;
00489 
00490                 FD_ZERO(&r_fds);
00491                 FD_ZERO(&w_fds);
00492                 FD_SET(fd, &r_fds);
00493                 if (msg2genr.head) {
00494                         FD_SET(msg_fd_out, &w_fds);
00495                         if (msg_fd_out > maxfd)
00496                                 maxfd = msg_fd_out;
00497                 }
00498                 if (io_filesfrom_f_out >= 0) {
00499                         int new_fd;
00500                         if (io_filesfrom_buflen == 0) {
00501                                 if (io_filesfrom_f_in >= 0) {
00502                                         FD_SET(io_filesfrom_f_in, &r_fds);
00503                                         new_fd = io_filesfrom_f_in;
00504                                 } else {
00505                                         io_filesfrom_f_out = -1;
00506                                         new_fd = -1;
00507                                 }
00508                         } else {
00509                                 FD_SET(io_filesfrom_f_out, &w_fds);
00510                                 new_fd = io_filesfrom_f_out;
00511                         }
00512                         if (new_fd > maxfd)
00513                                 maxfd = new_fd;
00514                 }
00515 
00516                 tv.tv_sec = select_timeout;
00517                 tv.tv_usec = 0;
00518 
00519                 errno = 0;
00520 
00521                 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
00522 
00523                 if (count <= 0) {
00524                         if (errno == EBADF)
00525                                 exit_cleanup(RERR_SOCKETIO);
00526                         check_timeout();
00527                         continue;
00528                 }
00529 
00530                 if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
00531                         msg2genr_flush(NORMAL_FLUSH);
00532 
00533                 if (io_filesfrom_f_out >= 0) {
00534                         if (io_filesfrom_buflen) {
00535                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
00536                                         int l = write(io_filesfrom_f_out,
00537                                                       io_filesfrom_bp,
00538                                                       io_filesfrom_buflen);
00539                                         if (l > 0) {
00540                                                 if (!(io_filesfrom_buflen -= l))
00541                                                         io_filesfrom_bp = io_filesfrom_buf;
00542                                                 else
00543                                                         io_filesfrom_bp += l;
00544                                         } else {
00545                                                 /* XXX should we complain? */
00546                                                 io_filesfrom_f_out = -1;
00547                                         }
00548                                 }
00549                         } else if (io_filesfrom_f_in >= 0) {
00550                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
00551                                         int l = read(io_filesfrom_f_in,
00552                                                      io_filesfrom_buf,
00553                                                      sizeof io_filesfrom_buf);
00554                                         if (l <= 0) {
00555                                                 /* Send end-of-file marker */
00556                                                 io_filesfrom_buf[0] = '\0';
00557                                                 io_filesfrom_buf[1] = '\0';
00558                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
00559                                                 io_filesfrom_f_in = -1;
00560                                         } else {
00561                                                 if (!eol_nulls) {
00562                                                         char *s = io_filesfrom_buf + l;
00563                                                         /* Transform CR and/or LF into '\0' */
00564                                                         while (s-- > io_filesfrom_buf) {
00565                                                                 if (*s == '\n' || *s == '\r')
00566                                                                         *s = '\0';
00567                                                         }
00568                                                 }
00569                                                 if (!io_filesfrom_lastchar) {
00570                                                         /* Last buf ended with a '\0', so don't
00571                                                          * let this buf start with one. */
00572                                                         while (l && !*io_filesfrom_bp)
00573                                                                 io_filesfrom_bp++, l--;
00574                                                 }
00575                                                 if (!l)
00576                                                         io_filesfrom_bp = io_filesfrom_buf;
00577                                                 else {
00578                                                         char *f = io_filesfrom_bp;
00579                                                         char *t = f;
00580                                                         char *eob = f + l;
00581                                                         /* Eliminate any multi-'\0' runs. */
00582                                                         while (f != eob) {
00583                                                                 if (!(*t++ = *f++)) {
00584                                                                         while (f != eob && !*f)
00585                                                                                 f++, l--;
00586                                                                 }
00587                                                         }
00588                                                         io_filesfrom_lastchar = f[-1];
00589                                                 }
00590                                                 io_filesfrom_buflen = l;
00591                                         }
00592                                 }
00593                         }
00594                 }
00595 
00596                 if (!FD_ISSET(fd, &r_fds))
00597                         continue;
00598 
00599                 n = read(fd, buf, len);
00600 
00601                 if (n <= 0) {
00602                         if (n == 0)
00603                                 whine_about_eof(fd); /* Doesn't return. */
00604                         if (errno == EINTR || errno == EWOULDBLOCK
00605                             || errno == EAGAIN)
00606                                 continue;
00607 
00608                         /* Don't write errors on a dead socket. */
00609                         if (fd == sock_f_in) {
00610                                 close_multiplexing_out();
00611                                 rsyserr(FSOCKERR, errno, "read error");
00612                         } else
00613                                 rsyserr(FERROR, errno, "read error");
00614                         exit_cleanup(RERR_STREAMIO);
00615                 }
00616 
00617                 buf += n;
00618                 len -= n;
00619                 cnt += n;
00620 
00621                 if (fd == sock_f_in && io_timeout)
00622                         last_io_in = time(NULL);
00623         }
00624 
00625         return cnt;
00626 }
00627 
00628 /**
00629  * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
00630  * characters long).
00631  */
00632 int read_filesfrom_line(int fd, char *fname)
00633 {
00634         char ch, *s, *eob = fname + MAXPATHLEN - 1;
00635         int cnt;
00636         int reading_remotely = filesfrom_host != NULL;
00637         int nulls = eol_nulls || reading_remotely;
00638 
00639   start:
00640         s = fname;
00641         while (1) {
00642                 cnt = read(fd, &ch, 1);
00643                 if (cnt < 0 && (errno == EWOULDBLOCK
00644                   || errno == EINTR || errno == EAGAIN)) {
00645                         struct timeval tv;
00646                         fd_set fds;
00647                         FD_ZERO(&fds);
00648                         FD_SET(fd, &fds);
00649                         tv.tv_sec = select_timeout;
00650                         tv.tv_usec = 0;
00651                         if (!select(fd+1, &fds, NULL, NULL, &tv))
00652                                 check_timeout();
00653                         continue;
00654                 }
00655                 if (cnt != 1)
00656                         break;
00657                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
00658                         /* Skip empty lines if reading locally. */
00659                         if (!reading_remotely && s == fname)
00660                                 continue;
00661                         break;
00662                 }
00663                 if (s < eob)
00664                         *s++ = ch;
00665         }
00666         *s = '\0';
00667 
00668         /* Dump comments. */
00669         if (*fname == '#' || *fname == ';')
00670                 goto start;
00671 
00672         return s - fname;
00673 }
00674 
00675 static char *iobuf_out;
00676 static int iobuf_out_cnt;
00677 
00678 void io_start_buffering_out(void)
00679 {
00680         if (iobuf_out)
00681                 return;
00682         if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
00683                 out_of_memory("io_start_buffering_out");
00684         iobuf_out_cnt = 0;
00685 }
00686 
00687 static char *iobuf_in;
00688 static size_t iobuf_in_siz;
00689 
00690 void io_start_buffering_in(void)
00691 {
00692         if (iobuf_in)
00693                 return;
00694         iobuf_in_siz = 2 * IO_BUFFER_SIZE;
00695         if (!(iobuf_in = new_array(char, iobuf_in_siz)))
00696                 out_of_memory("io_start_buffering_in");
00697 }
00698 
00699 void io_end_buffering(void)
00700 {
00701         io_flush(NORMAL_FLUSH);
00702         if (!io_multiplexing_out) {
00703                 free(iobuf_out);
00704                 iobuf_out = NULL;
00705         }
00706 }
00707 
00708 void maybe_flush_socket(void)
00709 {
00710         if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
00711                 io_flush(NORMAL_FLUSH);
00712 }
00713 
00714 void maybe_send_keepalive(void)
00715 {
00716         if (time(NULL) - last_io_out >= allowed_lull) {
00717                 if (!iobuf_out || !iobuf_out_cnt) {
00718                         if (protocol_version < 29)
00719                                 return; /* there's nothing we can do */
00720                         write_int(sock_f_out, the_file_list->count);
00721                         write_shortint(sock_f_out, ITEM_IS_NEW);
00722                 }
00723                 if (iobuf_out)
00724                         io_flush(NORMAL_FLUSH);
00725         }
00726 }
00727 
00728 /**
00729  * Continue trying to read len bytes - don't return until len has been
00730  * read.
00731  **/
00732 static void read_loop(int fd, char *buf, size_t len)
00733 {
00734         while (len) {
00735                 int n = read_timeout(fd, buf, len);
00736 
00737                 buf += n;
00738                 len -= n;
00739         }
00740 }
00741 
00742 /**
00743  * Read from the file descriptor handling multiplexing - return number
00744  * of bytes read.
00745  *
00746  * Never returns <= 0.
00747  */
00748 static int readfd_unbuffered(int fd, char *buf, size_t len)
00749 {
00750         static size_t remaining;
00751         static size_t iobuf_in_ndx;
00752         size_t msg_bytes;
00753         int tag, cnt = 0;
00754         char line[BIGPATHBUFLEN];
00755 
00756         if (!iobuf_in || fd != sock_f_in)
00757                 return read_timeout(fd, buf, len);
00758 
00759         if (!io_multiplexing_in && remaining == 0) {
00760                 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
00761                 iobuf_in_ndx = 0;
00762         }
00763 
00764         while (cnt == 0) {
00765                 if (remaining) {
00766                         len = MIN(len, remaining);
00767                         memcpy(buf, iobuf_in + iobuf_in_ndx, len);
00768                         iobuf_in_ndx += len;
00769                         remaining -= len;
00770                         cnt = len;
00771                         break;
00772                 }
00773 
00774                 read_loop(fd, line, 4);
00775                 tag = IVAL(line, 0);
00776 
00777                 msg_bytes = tag & 0xFFFFFF;
00778                 tag = (tag >> 24) - MPLEX_BASE;
00779 
00780                 switch (tag) {
00781                 case MSG_DATA:
00782                         if (msg_bytes > iobuf_in_siz) {
00783                                 if (!(iobuf_in = realloc_array(iobuf_in, char,
00784                                                                msg_bytes)))
00785                                         out_of_memory("readfd_unbuffered");
00786                                 iobuf_in_siz = msg_bytes;
00787                         }
00788                         read_loop(fd, iobuf_in, msg_bytes);
00789                         remaining = msg_bytes;
00790                         iobuf_in_ndx = 0;
00791                         break;
00792                 case MSG_DELETED:
00793                         if (msg_bytes >= sizeof line)
00794                                 goto overflow;
00795                         read_loop(fd, line, msg_bytes);
00796                         /* A directory name was sent with the trailing null */
00797                         if (msg_bytes > 0 && !line[msg_bytes-1])
00798                                 log_delete(line, S_IFDIR);
00799                         else {
00800                                 line[msg_bytes] = '\0';
00801                                 log_delete(line, S_IFREG);
00802                         }
00803                         break;
00804                 case MSG_SUCCESS:
00805                         if (msg_bytes != 4) {
00806                                 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
00807                                         tag, (long)msg_bytes, who_am_i());
00808                                 exit_cleanup(RERR_STREAMIO);
00809                         }
00810                         read_loop(fd, line, msg_bytes);
00811                         successful_send(IVAL(line, 0));
00812                         break;
00813                 case MSG_INFO:
00814                 case MSG_ERROR:
00815                         if (msg_bytes >= sizeof line) {
00816                             overflow:
00817                                 rprintf(FERROR,
00818                                         "multiplexing overflow %d:%ld [%s]\n",
00819                                         tag, (long)msg_bytes, who_am_i());
00820                                 exit_cleanup(RERR_STREAMIO);
00821                         }
00822                         read_loop(fd, line, msg_bytes);
00823                         rwrite((enum logcode)tag, line, msg_bytes);
00824                         break;
00825                 default:
00826                         rprintf(FERROR, "unexpected tag %d [%s]\n",
00827                                 tag, who_am_i());
00828                         exit_cleanup(RERR_STREAMIO);
00829                 }
00830         }
00831 
00832         if (remaining == 0)
00833                 io_flush(NORMAL_FLUSH);
00834 
00835         return cnt;
00836 }
00837 
00838 /**
00839  * Do a buffered read from @p fd.  Don't return until all @p n bytes
00840  * have been read.  If all @p n can't be read then exit with an
00841  * error.
00842  **/
00843 static void readfd(int fd, char *buffer, size_t N)
00844 {
00845         int  cnt;
00846         size_t total = 0;
00847 
00848         while (total < N) {
00849                 cnt = readfd_unbuffered(fd, buffer + total, N-total);
00850                 total += cnt;
00851         }
00852 
00853         if (fd == write_batch_monitor_in) {
00854                 if ((size_t)write(batch_fd, buffer, total) != total)
00855                         exit_cleanup(RERR_FILEIO);
00856         }
00857 
00858         if (fd == sock_f_in)
00859                 stats.total_read += total;
00860 }
00861 
00862 int read_shortint(int f)
00863 {
00864         uchar b[2];
00865         readfd(f, (char *)b, 2);
00866         return (b[1] << 8) + b[0];
00867 }
00868 
00869 int32 read_int(int f)
00870 {
00871         char b[4];
00872         int32 num;
00873 
00874         readfd(f,b,4);
00875         num = IVAL(b,0);
00876         if (num == (int32)0xffffffff)
00877                 return -1;
00878         return num;
00879 }
00880 
00881 int64 read_longint(int f)
00882 {
00883         int64 num;
00884         char b[8];
00885         num = read_int(f);
00886 
00887         if ((int32)num != (int32)0xffffffff)
00888                 return num;
00889 
00890 #if SIZEOF_INT64 < 8
00891         rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
00892         exit_cleanup(RERR_UNSUPPORTED);
00893 #else
00894         readfd(f,b,8);
00895         num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
00896 #endif
00897 
00898         return num;
00899 }
00900 
00901 void read_buf(int f,char *buf,size_t len)
00902 {
00903         readfd(f,buf,len);
00904 }
00905 
00906 void read_sbuf(int f,char *buf,size_t len)
00907 {
00908         readfd(f, buf, len);
00909         buf[len] = '\0';
00910 }
00911 
00912 uchar read_byte(int f)
00913 {
00914         uchar c;
00915         readfd(f, (char *)&c, 1);
00916         return c;
00917 }
00918 
00919 int read_vstring(int f, char *buf, int bufsize)
00920 {
00921         int len = read_byte(f);
00922 
00923         if (len & 0x80)
00924                 len = (len & ~0x80) * 0x100 + read_byte(f);
00925 
00926         if (len >= bufsize) {
00927                 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
00928                         len, bufsize - 1);
00929                 return -1;
00930         }
00931 
00932         if (len)
00933                 readfd(f, buf, len);
00934         buf[len] = '\0';
00935         return len;
00936 }
00937 
00938 /* Populate a sum_struct with values from the socket.  This is
00939  * called by both the sender and the receiver. */
00940 void read_sum_head(int f, struct sum_struct *sum)
00941 {
00942         sum->count = read_int(f);
00943         if (sum->count < 0) {
00944                 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
00945                         (long)sum->count, who_am_i());
00946                 exit_cleanup(RERR_PROTOCOL);
00947         }
00948         sum->blength = read_int(f);
00949         if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
00950                 rprintf(FERROR, "Invalid block length %ld [%s]\n",
00951                         (long)sum->blength, who_am_i());
00952                 exit_cleanup(RERR_PROTOCOL);
00953         }
00954         sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
00955         if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
00956                 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
00957                         sum->s2length, who_am_i());
00958                 exit_cleanup(RERR_PROTOCOL);
00959         }
00960         sum->remainder = read_int(f);
00961         if (sum->remainder < 0 || sum->remainder > sum->blength) {
00962                 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
00963                         (long)sum->remainder, who_am_i());
00964                 exit_cleanup(RERR_PROTOCOL);
00965         }
00966 }
00967 
00968 /* Send the values from a sum_struct over the socket.  Set sum to
00969  * NULL if there are no checksums to send.  This is called by both
00970  * the generator and the sender. */
00971 void write_sum_head(int f, struct sum_struct *sum)
00972 {
00973         static struct sum_struct null_sum;
00974 
00975         if (sum == NULL)
00976                 sum = &null_sum;
00977 
00978         write_int(f, sum->count);
00979         write_int(f, sum->blength);
00980         if (protocol_version >= 27)
00981                 write_int(f, sum->s2length);
00982         write_int(f, sum->remainder);
00983 }
00984 
00985 /**
00986  * Sleep after writing to limit I/O bandwidth usage.
00987  *
00988  * @todo Rather than sleeping after each write, it might be better to
00989  * use some kind of averaging.  The current algorithm seems to always
00990  * use a bit less bandwidth than specified, because it doesn't make up
00991  * for slow periods.  But arguably this is a feature.  In addition, we
00992  * ought to take the time used to write the data into account.
00993  *
00994  * During some phases of big transfers (file FOO is uptodate) this is
00995  * called with a small bytes_written every time.  As the kernel has to
00996  * round small waits up to guarantee that we actually wait at least the
00997  * requested number of microseconds, this can become grossly inaccurate.
00998  * We therefore keep track of the bytes we've written over time and only
00999  * sleep when the accumulated delay is at least 1 tenth of a second.
01000  **/
01001 static void sleep_for_bwlimit(int bytes_written)
01002 {
01003         static struct timeval prior_tv;
01004         static long total_written = 0; 
01005         struct timeval tv, start_tv;
01006         long elapsed_usec, sleep_usec;
01007 
01008 #define ONE_SEC 1000000L /* # of microseconds in a second */
01009 
01010         if (!bwlimit_writemax)
01011                 return;
01012 
01013         total_written += bytes_written; 
01014 
01015         gettimeofday(&start_tv, NULL);
01016         if (prior_tv.tv_sec) {
01017                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
01018                              + (start_tv.tv_usec - prior_tv.tv_usec);
01019                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
01020                 if (total_written < 0)
01021                         total_written = 0;
01022         }
01023 
01024         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
01025         if (sleep_usec < ONE_SEC / 10) {
01026                 prior_tv = start_tv;
01027                 return;
01028         }
01029 
01030         tv.tv_sec  = sleep_usec / ONE_SEC;
01031         tv.tv_usec = sleep_usec % ONE_SEC;
01032         select(0, NULL, NULL, NULL, &tv);
01033 
01034         gettimeofday(&prior_tv, NULL);
01035         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
01036                      + (prior_tv.tv_usec - start_tv.tv_usec);
01037         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
01038 }
01039 
01040 /* Write len bytes to the file descriptor fd, looping as necessary to get
01041  * the job done and also (in certain circumstances) reading any data on
01042  * msg_fd_in to avoid deadlock.
01043  *
01044  * This function underlies the multiplexing system.  The body of the
01045  * application never calls this function directly. */
01046 static void writefd_unbuffered(int fd,char *buf,size_t len)
01047 {
01048         size_t n, total = 0;
01049         fd_set w_fds, r_fds;
01050         int maxfd, count, cnt, using_r_fds;
01051         int defer_save = defer_forwarding_messages;
01052         struct timeval tv;
01053 
01054         no_flush++;
01055 
01056         while (total < len) {
01057                 FD_ZERO(&w_fds);
01058                 FD_SET(fd,&w_fds);
01059                 maxfd = fd;
01060 
01061                 if (msg_fd_in >= 0) {
01062                         FD_ZERO(&r_fds);
01063                         FD_SET(msg_fd_in,&r_fds);
01064                         if (msg_fd_in > maxfd)
01065                                 maxfd = msg_fd_in;
01066                         using_r_fds = 1;
01067                 } else
01068                         using_r_fds = 0;
01069 
01070                 tv.tv_sec = select_timeout;
01071                 tv.tv_usec = 0;
01072 
01073                 errno = 0;
01074                 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
01075                                &w_fds, NULL, &tv);
01076 
01077                 if (count <= 0) {
01078                         if (count < 0 && errno == EBADF)
01079                                 exit_cleanup(RERR_SOCKETIO);
01080                         check_timeout();
01081                         continue;
01082                 }
01083 
01084                 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
01085                         read_msg_fd();
01086 
01087                 if (!FD_ISSET(fd, &w_fds))
01088                         continue;
01089 
01090                 n = len - total;
01091                 if (bwlimit_writemax && n > bwlimit_writemax)
01092                         n = bwlimit_writemax;
01093                 cnt = write(fd, buf + total, n);
01094 
01095                 if (cnt <= 0) {
01096                         if (cnt < 0) {
01097                                 if (errno == EINTR)
01098                                         continue;
01099                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
01100                                         msleep(1);
01101                                         continue;
01102                                 }
01103                         }
01104 
01105                         /* Don't try to write errors back across the stream. */
01106                         if (fd == sock_f_out)
01107                                 close_multiplexing_out();
01108                         rsyserr(FERROR, errno,
01109                                 "writefd_unbuffered failed to write %ld bytes [%s]",
01110                                 (long)len, who_am_i());
01111                         /* If the other side is sending us error messages, try
01112                          * to grab any messages they sent before they died. */
01113                         while (fd == sock_f_out && io_multiplexing_in) {
01114                                 set_io_timeout(30);
01115                                 ignore_timeout = 0;
01116                                 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
01117                                                   sizeof io_filesfrom_buf);
01118                         }
01119                         exit_cleanup(RERR_STREAMIO);
01120                 }
01121 
01122                 total += cnt;
01123                 defer_forwarding_messages = 1;
01124 
01125                 if (fd == sock_f_out) {
01126                         if (io_timeout || am_generator)
01127                                 last_io_out = time(NULL);
01128                         sleep_for_bwlimit(cnt);
01129                 }
01130         }
01131 
01132         defer_forwarding_messages = defer_save;
01133         no_flush--;
01134 }
01135 
01136 static void msg2sndr_flush(void)
01137 {
01138         if (defer_forwarding_messages)
01139                 return;
01140 
01141         while (msg2sndr.head && io_multiplexing_out) {
01142                 struct msg_list_item *m = msg2sndr.head;
01143                 if (!(msg2sndr.head = m->next))
01144                         msg2sndr.tail = NULL;
01145                 stats.total_written += m->len;
01146                 defer_forwarding_messages = 1;
01147                 writefd_unbuffered(sock_f_out, m->buf, m->len);
01148                 defer_forwarding_messages = 0;
01149                 free(m);
01150         }
01151 }
01152 
01153 /**
01154  * Write an message to a multiplexed stream. If this fails then rsync
01155  * exits.
01156  **/
01157 static void mplex_write(enum msgcode code, char *buf, size_t len)
01158 {
01159         char buffer[1024];
01160         size_t n = len;
01161 
01162         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
01163 
01164         if (n > sizeof buffer - 4)
01165                 n = 0;
01166         else
01167                 memcpy(buffer + 4, buf, n);
01168 
01169         writefd_unbuffered(sock_f_out, buffer, n+4);
01170 
01171         len -= n;
01172         buf += n;
01173 
01174         if (len) {
01175                 defer_forwarding_messages = 1;
01176                 writefd_unbuffered(sock_f_out, buf, len);
01177                 defer_forwarding_messages = 0;
01178                 msg2sndr_flush();
01179         }
01180 }
01181 
01182 void io_flush(int flush_it_all)
01183 {
01184         msg2genr_flush(flush_it_all);
01185         msg2sndr_flush();
01186 
01187         if (!iobuf_out_cnt || no_flush)
01188                 return;
01189 
01190         if (io_multiplexing_out)
01191                 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
01192         else
01193                 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
01194         iobuf_out_cnt = 0;
01195 }
01196 
01197 static void writefd(int fd,char *buf,size_t len)
01198 {
01199         if (fd == msg_fd_out) {
01200                 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
01201                 exit_cleanup(RERR_PROTOCOL);
01202         }
01203 
01204         if (fd == sock_f_out)
01205                 stats.total_written += len;
01206 
01207         if (fd == write_batch_monitor_out) {
01208                 if ((size_t)write(batch_fd, buf, len) != len)
01209                         exit_cleanup(RERR_FILEIO);
01210         }
01211 
01212         if (!iobuf_out || fd != sock_f_out) {
01213                 writefd_unbuffered(fd, buf, len);
01214                 return;
01215         }
01216 
01217         while (len) {
01218                 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
01219                 if (n > 0) {
01220                         memcpy(iobuf_out+iobuf_out_cnt, buf, n);
01221                         buf += n;
01222                         len -= n;
01223                         iobuf_out_cnt += n;
01224                 }
01225 
01226                 if (iobuf_out_cnt == IO_BUFFER_SIZE)
01227                         io_flush(NORMAL_FLUSH);
01228         }
01229 }
01230 
01231 void write_shortint(int f, int x)
01232 {
01233         uchar b[2];
01234         b[0] = x;
01235         b[1] = x >> 8;
01236         writefd(f, (char *)b, 2);
01237 }
01238 
01239 void write_int(int f,int32 x)
01240 {
01241         char b[4];
01242         SIVAL(b,0,x);
01243         writefd(f,b,4);
01244 }
01245 
01246 /*
01247  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
01248  * 64-bit types on this platform.
01249  */
01250 void write_longint(int f, int64 x)
01251 {
01252         char b[8];
01253 
01254         if (x <= 0x7FFFFFFF) {
01255                 write_int(f, (int)x);
01256                 return;
01257         }
01258 
01259 #if SIZEOF_INT64 < 8
01260         rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
01261         exit_cleanup(RERR_UNSUPPORTED);
01262 #else
01263         write_int(f, (int32)0xFFFFFFFF);
01264         SIVAL(b,0,(x&0xFFFFFFFF));
01265         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
01266 
01267         writefd(f,b,8);
01268 #endif
01269 }
01270 
01271 void write_buf(int f,char *buf,size_t len)
01272 {
01273         writefd(f,buf,len);
01274 }
01275 
01276 /** Write a string to the connection */
01277 void write_sbuf(int f, char *buf)
01278 {
01279         writefd(f, buf, strlen(buf));
01280 }
01281 
01282 void write_byte(int f, uchar c)
01283 {
01284         writefd(f, (char *)&c, 1);
01285 }
01286 
01287 void write_vstring(int f, char *str, int len)
01288 {
01289         uchar lenbuf[3], *lb = lenbuf;
01290 
01291         if (len > 0x7F) {
01292                 if (len > 0x7FFF) {
01293                         rprintf(FERROR,
01294                                 "attempting to send over-long vstring (%d > %d)\n",
01295                                 len, 0x7FFF);
01296                         exit_cleanup(RERR_PROTOCOL);
01297                 }
01298                 *lb++ = len / 0x100 + 0x80;
01299         }
01300         *lb = len;
01301 
01302         writefd(f, (char*)lenbuf, lb - lenbuf + 1);
01303         if (len)
01304                 writefd(f, str, len);
01305 }
01306 
01307 /**
01308  * Read a line of up to @p maxlen characters into @p buf (not counting
01309  * the trailing null).  Strips the (required) trailing newline and all
01310  * carriage returns.
01311  *
01312  * @return 1 for success; 0 for I/O error or truncation.
01313  **/
01314 int read_line(int f, char *buf, size_t maxlen)
01315 {
01316         while (maxlen) {
01317                 buf[0] = 0;
01318                 read_buf(f, buf, 1);
01319                 if (buf[0] == 0)
01320                         return 0;
01321                 if (buf[0] == '\n')
01322                         break;
01323                 if (buf[0] != '\r') {
01324                         buf++;
01325                         maxlen--;
01326                 }
01327         }
01328         *buf = '\0';
01329         return maxlen > 0;
01330 }
01331 
01332 void io_printf(int fd, const char *format, ...)
01333 {
01334         va_list ap;
01335         char buf[BIGPATHBUFLEN];
01336         int len;
01337 
01338         va_start(ap, format);
01339         len = vsnprintf(buf, sizeof buf, format, ap);
01340         va_end(ap);
01341 
01342         if (len < 0)
01343                 exit_cleanup(RERR_STREAMIO);
01344 
01345         if (len > (int)sizeof buf) {
01346                 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
01347                 exit_cleanup(RERR_STREAMIO);
01348         }
01349 
01350         write_sbuf(fd, buf);
01351 }
01352 
01353 /** Setup for multiplexing a MSG_* stream with the data stream. */
01354 void io_start_multiplex_out(void)
01355 {
01356         io_flush(NORMAL_FLUSH);
01357         io_start_buffering_out();
01358         io_multiplexing_out = 1;
01359 }
01360 
01361 /** Setup for multiplexing a MSG_* stream with the data stream. */
01362 void io_start_multiplex_in(void)
01363 {
01364         io_flush(NORMAL_FLUSH);
01365         io_start_buffering_in();
01366         io_multiplexing_in = 1;
01367 }
01368 
01369 /** Write an message to the multiplexed data stream. */
01370 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
01371 {
01372         if (!io_multiplexing_out)
01373                 return 0;
01374 
01375         io_flush(NORMAL_FLUSH);
01376         stats.total_written += (len+4);
01377         mplex_write(code, buf, len);
01378         return 1;
01379 }
01380 
01381 void close_multiplexing_in(void)
01382 {
01383         io_multiplexing_in = 0;
01384 }
01385 
01386 /** Stop output multiplexing. */
01387 void close_multiplexing_out(void)
01388 {
01389         io_multiplexing_out = 0;
01390 }
01391 
01392 void start_write_batch(int fd)
01393 {
01394         write_stream_flags(batch_fd);
01395 
01396         /* Some communication has already taken place, but we don't
01397          * enable batch writing until here so that we can write a
01398          * canonical record of the communication even though the
01399          * actual communication so far depends on whether a daemon
01400          * is involved. */
01401         write_int(batch_fd, protocol_version);
01402         write_int(batch_fd, checksum_seed);
01403 
01404         if (am_sender)
01405                 write_batch_monitor_out = fd;
01406         else
01407                 write_batch_monitor_in = fd;
01408 }
01409 
01410 void stop_write_batch(void)
01411 {
01412         write_batch_monitor_out = -1;
01413         write_batch_monitor_in = -1;
01414 }

rsyncに対してSat Dec 5 19:45:41 2009に生成されました。  doxygen 1.4.7