io.c

Socket and pipe I/O utilities used in rsync. [詳細]

ソースコードを見る。


データ構造

struct  flist_ndx_item
struct  flist_ndx_list
struct  msg_list_item
struct  msg_list

関数

static void read_loop (int fd, char *buf, size_t len)
 Continue trying to read len bytes - don't return until len has been read.
static void flist_ndx_push (struct flist_ndx_list *lp, int ndx)
static int flist_ndx_pop (struct flist_ndx_list *lp)
static void check_timeout (void)
void io_set_sock_fds (int f_in, int f_out)
void set_io_timeout (int secs)
void set_msg_fd_in (int fd)
void set_msg_fd_out (int fd)
static void msg_list_add (struct msg_list *lst, int code, char *buf, int len)
static void read_msg_fd (void)
void increment_active_files (int ndx, int itemizing, enum logcode code)
void decrement_active_files (int ndx)
static int msg2genr_flush (int flush_it_all)
void send_msg (enum msgcode code, char *buf, int len)
int get_redo_num (int itemizing, enum logcode code)
int get_hlink_num (void)
void io_set_filesfrom_fds (int f_in, int f_out)
 When we're the receiver and we have a local --files-from list of names that needs to be sent over the socket to the sender, we have to do two things at the same time: send the sender a list of what files we're processing and read the incoming file+info list from the sender.
static void whine_about_eof (int fd)
static int read_timeout (int fd, char *buf, size_t len)
 Read from a socket with I/O timeout.
int read_filesfrom_line (int fd, char *fname)
 Read a line into the "fname" buffer (which must be at least MAXPATHLEN characters long).
void io_start_buffering_out (void)
void io_start_buffering_in (void)
void io_end_buffering (void)
void maybe_flush_socket (void)
void maybe_send_keepalive (void)
static int readfd_unbuffered (int fd, char *buf, size_t len)
 Read from the file descriptor handling multiplexing - return number of bytes read.
static void readfd (int fd, char *buffer, size_t N)
 Do a buffered read from fd.
int read_shortint (int f)
int32 read_int (int f)
int64 read_longint (int f)
void read_buf (int f, char *buf, size_t len)
void read_sbuf (int f, char *buf, size_t len)
uchar read_byte (int f)
int read_vstring (int f, char *buf, int bufsize)
void read_sum_head (int f, struct sum_struct *sum)
void write_sum_head (int f, struct sum_struct *sum)
static void sleep_for_bwlimit (int bytes_written)
 Sleep after writing to limit I/O bandwidth usage.
static void writefd_unbuffered (int fd, char *buf, size_t len)
static void msg2sndr_flush (void)
static void mplex_write (enum msgcode code, char *buf, size_t len)
 Write an message to a multiplexed stream.
void io_flush (int flush_it_all)
static void writefd (int fd, char *buf, size_t len)
void write_shortint (int f, int x)
void write_int (int f, int32 x)
void write_longint (int f, int64 x)
void write_buf (int f, char *buf, size_t len)
void write_sbuf (int f, char *buf)
 Write a string to the connection
void write_byte (int f, uchar c)
void write_vstring (int f, char *str, int len)
int read_line (int f, char *buf, size_t maxlen)
 Read a line of up to maxlen characters into buf (not counting the trailing null).
void io_printf (int fd, const char *format,...)
void io_start_multiplex_out (void)
 Setup for multiplexing a MSG_* stream with the data stream.
void io_start_multiplex_in (void)
 Setup for multiplexing a MSG_* stream with the data stream.
int io_multiplex_write (enum msgcode code, char *buf, size_t len)
 Write an message to the multiplexed data stream.
void close_multiplexing_in (void)
void close_multiplexing_out (void)
 Stop output multiplexing.
void start_write_batch (int fd)
void stop_write_batch (void)

変数

int bwlimit
size_t bwlimit_writemax
int io_timeout
int allowed_lull
int am_server
int am_daemon
int am_sender
int am_generator
int eol_nulls
int read_batch
int csum_length
int checksum_seed
int protocol_version
int remove_sent_files
int preserve_hard_links
char * filesfrom_host
stats stats
file_listthe_file_list
const char phase_unknown [] = "unknown"
int ignore_timeout = 0
int batch_fd = -1
int batch_gen_fd = -1
int kluge_around_eof = 0
int msg_fd_in = -1
int msg_fd_out = -1
int sock_f_in = -1
int sock_f_out = -1
static int io_multiplexing_out
static int io_multiplexing_in
static time_t last_io_in
static time_t last_io_out
static int no_flush
static int write_batch_monitor_in = -1
static int write_batch_monitor_out = -1
static int io_filesfrom_f_in = -1
static int io_filesfrom_f_out = -1
static char io_filesfrom_buf [2048]
static char * io_filesfrom_bp
static char io_filesfrom_lastchar
static int io_filesfrom_buflen
static int defer_forwarding_messages = 0
static int select_timeout = SELECT_TIMEOUT
static int active_filecnt = 0
static OFF_T active_bytecnt = 0
static struct flist_ndx_list
redo_list 
hlink_list
static struct msg_list msg2genr msg2sndr
static char * iobuf_out
static int iobuf_out_cnt
static char * iobuf_in
static size_t iobuf_in_siz

説明

Socket and pipe I/O utilities used in rsync.

rsync provides its own multiplexing system, which is used to send stderr and stdout over a single socket. We need this because stdout normally carries the binary data stream, and stderr all our error messages.

For historical reasons this is off during the start of the connection, but it's switched on quite early using io_start_multiplex_out() and io_start_multiplex_in().

io.c で定義されています。


関数

static void read_loop ( int  fd,
char *  buf,
size_t  len 
) [static]

Continue trying to read len bytes - don't return until len has been read.

io.c732 行で定義されています。

参照先 read_timeout().

参照元 read_msg_fd()readfd_unbuffered().

00733 {
00734         while (len) {
00735                 int n = read_timeout(fd, buf, len);
00736 
00737                 buf += n;
00738                 len -= n;
00739         }
00740 }

static void flist_ndx_push ( struct flist_ndx_list lp,
int  ndx 
) [static]

io.c119 行で定義されています。

参照先 flist_ndx_list::headflist_ndx_item::ndxflist_ndx_item::nextout_of_memory()flist_ndx_list::tail.

参照元 read_msg_fd().

00120 {
00121         struct flist_ndx_item *item;
00122 
00123         if (!(item = new(struct flist_ndx_item)))
00124                 out_of_memory("flist_ndx_push");
00125         item->next = NULL;
00126         item->ndx = ndx;
00127         if (lp->tail)
00128                 lp->tail->next = item;
00129         else
00130                 lp->head = item;
00131         lp->tail = item;
00132 }

static int flist_ndx_pop ( struct flist_ndx_list lp  )  [static]

io.c134 行で定義されています。

参照先 flist_ndx_list::headflist_ndx_item::ndxflist_ndx_item::nextflist_ndx_list::tail.

参照元 get_hlink_num()get_redo_num().

00135 {
00136         struct flist_ndx_item *next;
00137         int ndx;
00138 
00139         if (!lp->head)
00140                 return -1;
00141 
00142         ndx = lp->head->ndx;
00143         next = lp->head->next;
00144         free(lp->head);
00145         lp->head = next;
00146         if (!next)
00147                 lp->tail = NULL;
00148 
00149         return ndx;
00150 }

static void check_timeout ( void   )  [static]

io.c152 行で定義されています。

参照先 am_daemonam_serverFERRORignore_timeoutio_timeoutlast_io_inrprintf().

参照元 msg2genr_flush()read_filesfrom_line()read_timeout()writefd_unbuffered().

00153 {
00154         time_t t;
00155 
00156         if (!io_timeout || ignore_timeout)
00157                 return;
00158 
00159         if (!last_io_in) {
00160                 last_io_in = time(NULL);
00161                 return;
00162         }
00163 
00164         t = time(NULL);
00165 
00166         if (t - last_io_in >= io_timeout) {
00167                 if (!am_server && !am_daemon) {
00168                         rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
00169                                 (int)(t-last_io_in));
00170                 }
00171                 exit_cleanup(RERR_TIMEOUT);
00172         }
00173 }

void io_set_sock_fds ( int  f_in,
int  f_out 
)

io.c177 行で定義されています。

参照先 sock_f_insock_f_out.

参照元 client_run()start_daemon()start_server().

00178 {
00179         sock_f_in = f_in;
00180         sock_f_out = f_out;
00181 }

void set_io_timeout ( int  secs  ) 

io.c183 行で定義されています。

参照先 allowed_lullio_timeoutread_batchselect_timeout.

参照元 rsync_module()writefd_unbuffered().

00184 {
00185         io_timeout = secs;
00186 
00187         if (!io_timeout || io_timeout > SELECT_TIMEOUT)
00188                 select_timeout = SELECT_TIMEOUT;
00189         else
00190                 select_timeout = io_timeout;
00191 
00192         allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
00193 }

void set_msg_fd_in ( int  fd  ) 

io.c199 行で定義されています。

参照先 msg_fd_in.

参照元 client_run()do_recv().

00200 {
00201         msg_fd_in = fd;
00202 }

void set_msg_fd_out ( int  fd  ) 

io.c206 行で定義されています。

参照先 msg_fd_outset_nonblocking().

参照元 do_recv().

00207 {
00208         msg_fd_out = fd;
00209         set_nonblocking(msg_fd_out);
00210 }

static void msg_list_add ( struct msg_list lst,
int  code,
char *  buf,
int  len 
) [static]

io.c213 行で定義されています。

参照先 msg_list::headmmsg_list_item::nextout_of_memory()msg_list::tail.

参照元 read_msg_fd()send_msg().

00214 {
00215         struct msg_list_item *m;
00216         int sz = len + 4 + sizeof m[0] - 1;
00217 
00218         if (!(m = (struct msg_list_item *)new_array(char, sz)))
00219                 out_of_memory("msg_list_add");
00220         m->next = NULL;
00221         m->len = len + 4;
00222         SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
00223         memcpy(m->buf + 4, buf, len);
00224         if (lst->tail)
00225                 lst->tail->next = m;
00226         else
00227                 lst->head = m;
00228         lst->tail = m;
00229 }

static void read_msg_fd ( void   )  [static]

io.c235 行で定義されています。

参照先 am_generatoram_serverbufclose_multiplexing_out()decrement_active_files()defer_forwarding_messagesFERRORflist_ndx_push()hlink_listio_multiplex_write()msg_list_item::lenmsg2sndrMSG_DELETEDMSG_DONEMSG_ERRORmsg_fd_inMSG_INFOmsg_list_add()MSG_LOGMSG_REDOMSG_SOCKERRMSG_SUCCESSpreserve_hard_linksread_loop()remove_sent_filesrprintf()rwrite()who_am_i().

参照元 get_redo_num()increment_active_files()writefd_unbuffered().

00236 {
00237         char buf[2048];
00238         size_t n;
00239         int fd = msg_fd_in;
00240         int tag, len;
00241 
00242         /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
00243          * to this routine from writefd_unbuffered(). */
00244         msg_fd_in = -1;
00245 
00246         read_loop(fd, buf, 4);
00247         tag = IVAL(buf, 0);
00248 
00249         len = tag & 0xFFFFFF;
00250         tag = (tag >> 24) - MPLEX_BASE;
00251 
00252         switch (tag) {
00253         case MSG_DONE:
00254                 if (len != 0 || !am_generator) {
00255                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00256                         exit_cleanup(RERR_STREAMIO);
00257                 }
00258                 flist_ndx_push(&redo_list, -1);
00259                 break;
00260         case MSG_REDO:
00261                 if (len != 4 || !am_generator) {
00262                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00263                         exit_cleanup(RERR_STREAMIO);
00264                 }
00265                 read_loop(fd, buf, 4);
00266                 if (remove_sent_files)
00267                         decrement_active_files(IVAL(buf,0));
00268                 flist_ndx_push(&redo_list, IVAL(buf,0));
00269                 break;
00270         case MSG_DELETED:
00271                 if (len >= (int)sizeof buf || !am_generator) {
00272                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00273                         exit_cleanup(RERR_STREAMIO);
00274                 }
00275                 read_loop(fd, buf, len);
00276                 if (defer_forwarding_messages)
00277                         msg_list_add(&msg2sndr, MSG_DELETED, buf, len);
00278                 else
00279                         io_multiplex_write(MSG_DELETED, buf, len);
00280                 break;
00281         case MSG_SUCCESS:
00282                 if (len != 4 || !am_generator) {
00283                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00284                         exit_cleanup(RERR_STREAMIO);
00285                 }
00286                 read_loop(fd, buf, len);
00287                 if (remove_sent_files) {
00288                         decrement_active_files(IVAL(buf,0));
00289                         if (defer_forwarding_messages)
00290                                 msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len);
00291                         else
00292                                 io_multiplex_write(MSG_SUCCESS, buf, len);
00293                 }
00294                 if (preserve_hard_links)
00295                         flist_ndx_push(&hlink_list, IVAL(buf,0));
00296                 break;
00297         case MSG_SOCKERR:
00298                 if (!am_generator) {
00299                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00300                         exit_cleanup(RERR_STREAMIO);
00301                 }
00302                 close_multiplexing_out();
00303                 /* FALL THROUGH */
00304         case MSG_INFO:
00305         case MSG_ERROR:
00306         case MSG_LOG:
00307                 while (len) {
00308                         n = len;
00309                         if (n >= sizeof buf)
00310                                 n = sizeof buf - 1;
00311                         read_loop(fd, buf, n);
00312                         if (am_generator && am_server && defer_forwarding_messages)
00313                                 msg_list_add(&msg2sndr, tag, buf, n);
00314                         else
00315                                 rwrite((enum logcode)tag, buf, n);
00316                         len -= n;
00317                 }
00318                 break;
00319         default:
00320                 rprintf(FERROR, "unknown message %d:%d [%s]\n",
00321                         tag, len, who_am_i());
00322                 exit_cleanup(RERR_STREAMIO);
00323         }
00324 
00325         msg_fd_in = fd;
00326 }

void increment_active_files ( int  ndx,
int  itemizing,
enum logcode  code 
)

io.c331 行で定義されています。

参照先 active_bytecntactive_filecntcheck_for_finished_hlinks()file_list::fileshlink_listfile_struct::lengthread_msg_fd()the_file_list.

参照元 recv_generator().

00332 {
00333         /* TODO: tune these limits? */
00334         while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
00335                 if (hlink_list.head)
00336                         check_for_finished_hlinks(itemizing, code);
00337                 read_msg_fd();
00338         }
00339 
00340         active_filecnt++;
00341         active_bytecnt += the_file_list->files[ndx]->length;
00342 }

void decrement_active_files ( int  ndx  ) 

io.c344 行で定義されています。

参照先 active_bytecntactive_filecntfile_list::filesfile_struct::lengththe_file_list.

参照元 read_msg_fd().

00345 {
00346         active_filecnt--;
00347         active_bytecnt -= the_file_list->files[ndx]->length;
00348 }

static int msg2genr_flush ( int  flush_it_all  )  [static]

io.c353 行で定義されています。

参照先 check_timeout()errnomsg_list::headmmsg_fd_outselect_timeoutmsg_list::tail.

参照元 io_flush()read_timeout()send_msg().

00354 {
00355         static int written = 0;
00356         struct timeval tv;
00357         fd_set fds;
00358 
00359         if (msg_fd_out < 0)
00360                 return -1;
00361 
00362         while (msg2genr.head) {
00363                 struct msg_list_item *m = msg2genr.head;
00364                 int n = write(msg_fd_out, m->buf + written, m->len - written);
00365                 if (n < 0) {
00366                         if (errno == EINTR)
00367                                 continue;
00368                         if (errno != EWOULDBLOCK && errno != EAGAIN)
00369                                 return -1;
00370                         if (!flush_it_all)
00371                                 return 0;
00372                         FD_ZERO(&fds);
00373                         FD_SET(msg_fd_out, &fds);
00374                         tv.tv_sec = select_timeout;
00375                         tv.tv_usec = 0;
00376                         if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
00377                                 check_timeout();
00378                 } else if ((written += n) == m->len) {
00379                         msg2genr.head = m->next;
00380                         if (!msg2genr.head)
00381                                 msg2genr.tail = NULL;
00382                         free(m);
00383                         written = 0;
00384                 }
00385         }
00386         return 1;
00387 }

void send_msg ( enum msgcode  code,
char *  buf,
int  len 
)

io.c389 行で定義されています。

参照先 io_multiplex_write()msg2genr_flush()msg_fd_outmsg_list_add().

参照元 do_recv()handle_delayed_updates()log_delete()recv_files()recv_generator()rwrite().

00390 {
00391         if (msg_fd_out < 0) {
00392                 io_multiplex_write(code, buf, len);
00393                 return;
00394         }
00395         msg_list_add(&msg2genr, code, buf, len);
00396         msg2genr_flush(NORMAL_FLUSH);
00397 }

int get_redo_num ( int  itemizing,
enum logcode  code 
)

io.c399 行で定義されています。

参照先 check_for_finished_hlinks()flist_ndx_pop()flist_ndx_list::headhlink_listread_msg_fd().

参照元 generate_files().

00400 {
00401         while (1) {
00402                 if (hlink_list.head)
00403                         check_for_finished_hlinks(itemizing, code);
00404                 if (redo_list.head)
00405                         break;
00406                 read_msg_fd();
00407         }
00408 
00409         return flist_ndx_pop(&redo_list);
00410 }

int get_hlink_num ( void   ) 

io.c412 行で定義されています。

参照先 flist_ndx_pop()hlink_list.

参照元 check_for_finished_hlinks().

00413 {
00414         return flist_ndx_pop(&hlink_list);
00415 }

void io_set_filesfrom_fds ( int  f_in,
int  f_out 
)

When we're the receiver and we have a local --files-from list of names that needs to be sent over the socket to the sender, we have to do two things at the same time: send the sender a list of what files we're processing and read the incoming file+info list from the sender.

We do this by augmenting the read_timeout() function to copy this data. It uses the io_filesfrom_buf to read a block of data from f_in (when it is ready, since it might be a pipe) and then blast it out f_out (when it is ready to receive more data).

io.c427 行で定義されています。

参照先 io_filesfrom_bpio_filesfrom_bufio_filesfrom_buflenio_filesfrom_f_inio_filesfrom_f_outio_filesfrom_lastchar.

参照元 client_run()do_server_recv().

00428 {
00429         io_filesfrom_f_in = f_in;
00430         io_filesfrom_f_out = f_out;
00431         io_filesfrom_bp = io_filesfrom_buf;
00432         io_filesfrom_lastchar = '\0';
00433         io_filesfrom_buflen = 0;
00434 }

static void whine_about_eof ( int  fd  )  [static]

io.c448 行で定義されています。

参照先 FERRORkluge_around_eofmsleep()rprintf()sock_f_instatsstats::total_readwho_am_i().

参照元 read_timeout().

00449 {
00450         if (kluge_around_eof && fd == sock_f_in) {
00451                 int i;
00452                 if (kluge_around_eof > 0)
00453                         exit_cleanup(0);
00454                 /* If we're still here after 10 seconds, exit with an error. */
00455                 for (i = 10*1000/20; i--; )
00456                         msleep(20);
00457         }
00458 
00459         rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
00460                 "(%.0f bytes received so far) [%s]\n",
00461                 (double)stats.total_read, who_am_i());
00462 
00463         exit_cleanup(RERR_STREAMIO);
00464 }

static int read_timeout ( int  fd,
char *  buf,
size_t  len 
) [static]

Read from a socket with I/O timeout.

return the number of bytes read. If no bytes can be read then exit, never return a number <= 0.

TODO: If the remote shell connection fails, then current versions actually report an "unexpected EOF" error here. Since it's a fairly common mistake to try to use rsh when ssh is required, we should trap that: if we fail to read any data at all, we should give a better explanation. We can tell whether the connection has started by looking e.g. at whether the remote version is known yet.

io.c477 行で定義されています。

参照先 check_timeout()close_multiplexing_out()eol_nullserrnoFERRORFSOCKERRmsg_list::headio_filesfrom_bpio_filesfrom_bufio_filesfrom_buflenio_filesfrom_f_inio_filesfrom_f_outio_filesfrom_lastchario_flush()io_timeoutlast_io_inmsg2genr_flush()msg_fd_outrsyserr()select_timeoutsock_f_inwhine_about_eof().

参照元 read_loop()readfd_unbuffered().

00478 {
00479         int n, cnt = 0;
00480 
00481         io_flush(NORMAL_FLUSH);
00482 
00483         while (cnt == 0) {
00484                 /* until we manage to read *something* */
00485                 fd_set r_fds, w_fds;
00486                 struct timeval tv;
00487                 int maxfd = fd;
00488                 int count;
00489 
00490                 FD_ZERO(&r_fds);
00491                 FD_ZERO(&w_fds);
00492                 FD_SET(fd, &r_fds);
00493                 if (msg2genr.head) {
00494                         FD_SET(msg_fd_out, &w_fds);
00495                         if (msg_fd_out > maxfd)
00496                                 maxfd = msg_fd_out;
00497                 }
00498                 if (io_filesfrom_f_out >= 0) {
00499                         int new_fd;
00500                         if (io_filesfrom_buflen == 0) {
00501                                 if (io_filesfrom_f_in >= 0) {
00502                                         FD_SET(io_filesfrom_f_in, &r_fds);
00503                                         new_fd = io_filesfrom_f_in;
00504                                 } else {
00505                                         io_filesfrom_f_out = -1;
00506                                         new_fd = -1;
00507                                 }
00508                         } else {
00509                                 FD_SET(io_filesfrom_f_out, &w_fds);
00510                                 new_fd = io_filesfrom_f_out;
00511                         }
00512                         if (new_fd > maxfd)
00513                                 maxfd = new_fd;
00514                 }
00515 
00516                 tv.tv_sec = select_timeout;
00517                 tv.tv_usec = 0;
00518 
00519                 errno = 0;
00520 
00521                 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
00522 
00523                 if (count <= 0) {
00524                         if (errno == EBADF)
00525                                 exit_cleanup(RERR_SOCKETIO);
00526                         check_timeout();
00527                         continue;
00528                 }
00529 
00530                 if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
00531                         msg2genr_flush(NORMAL_FLUSH);
00532 
00533                 if (io_filesfrom_f_out >= 0) {
00534                         if (io_filesfrom_buflen) {
00535                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
00536                                         int l = write(io_filesfrom_f_out,
00537                                                       io_filesfrom_bp,
00538                                                       io_filesfrom_buflen);
00539                                         if (l > 0) {
00540                                                 if (!(io_filesfrom_buflen -= l))
00541                                                         io_filesfrom_bp = io_filesfrom_buf;
00542                                                 else
00543                                                         io_filesfrom_bp += l;
00544                                         } else {
00545                                                 /* XXX should we complain? */
00546                                                 io_filesfrom_f_out = -1;
00547                                         }
00548                                 }
00549                         } else if (io_filesfrom_f_in >= 0) {
00550                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
00551                                         int l = read(io_filesfrom_f_in,
00552                                                      io_filesfrom_buf,
00553                                                      sizeof io_filesfrom_buf);
00554                                         if (l <= 0) {
00555                                                 /* Send end-of-file marker */
00556                                                 io_filesfrom_buf[0] = '\0';
00557                                                 io_filesfrom_buf[1] = '\0';
00558                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
00559                                                 io_filesfrom_f_in = -1;
00560                                         } else {
00561                                                 if (!eol_nulls) {
00562                                                         char *s = io_filesfrom_buf + l;
00563                                                         /* Transform CR and/or LF into '\0' */
00564                                                         while (s-- > io_filesfrom_buf) {
00565                                                                 if (*s == '\n' || *s == '\r')
00566                                                                         *s = '\0';
00567                                                         }
00568                                                 }
00569                                                 if (!io_filesfrom_lastchar) {
00570                                                         /* Last buf ended with a '\0', so don't
00571                                                          * let this buf start with one. */
00572                                                         while (l && !*io_filesfrom_bp)
00573                                                                 io_filesfrom_bp++, l--;
00574                                                 }
00575                                                 if (!l)
00576                                                         io_filesfrom_bp = io_filesfrom_buf;
00577                                                 else {
00578                                                         char *f = io_filesfrom_bp;
00579                                                         char *t = f;
00580                                                         char *eob = f + l;
00581                                                         /* Eliminate any multi-'\0' runs. */
00582                                                         while (f != eob) {
00583                                                                 if (!(*t++ = *f++)) {
00584                                                                         while (f != eob && !*f)
00585                                                                                 f++, l--;
00586                                                                 }
00587                                                         }
00588                                                         io_filesfrom_lastchar = f[-1];
00589                                                 }
00590                                                 io_filesfrom_buflen = l;
00591                                         }
00592                                 }
00593                         }
00594                 }
00595 
00596                 if (!FD_ISSET(fd, &r_fds))
00597                         continue;
00598 
00599                 n = read(fd, buf, len);
00600 
00601                 if (n <= 0) {
00602                         if (n == 0)
00603                                 whine_about_eof(fd); /* Doesn't return. */
00604                         if (errno == EINTR || errno == EWOULDBLOCK
00605                             || errno == EAGAIN)
00606                                 continue;
00607 
00608                         /* Don't write errors on a dead socket. */
00609                         if (fd == sock_f_in) {
00610                                 close_multiplexing_out();
00611                                 rsyserr(FSOCKERR, errno, "read error");
00612                         } else
00613                                 rsyserr(FERROR, errno, "read error");
00614                         exit_cleanup(RERR_STREAMIO);
00615                 }
00616 
00617                 buf += n;
00618                 len -= n;
00619                 cnt += n;
00620 
00621                 if (fd == sock_f_in && io_timeout)
00622                         last_io_in = time(NULL);
00623         }
00624 
00625         return cnt;
00626 }

int read_filesfrom_line ( int  fd,
char *  fname 
)

Read a line into the "fname" buffer (which must be at least MAXPATHLEN characters long).

io.c632 行で定義されています。

参照先 check_timeout()eol_nullserrnofilesfrom_hostselect_timeout.

参照元 send_file_list().

00633 {
00634         char ch, *s, *eob = fname + MAXPATHLEN - 1;
00635         int cnt;
00636         int reading_remotely = filesfrom_host != NULL;
00637         int nulls = eol_nulls || reading_remotely;
00638 
00639   start:
00640         s = fname;
00641         while (1) {
00642                 cnt = read(fd, &ch, 1);
00643                 if (cnt < 0 && (errno == EWOULDBLOCK
00644                   || errno == EINTR || errno == EAGAIN)) {
00645                         struct timeval tv;
00646                         fd_set fds;
00647                         FD_ZERO(&fds);
00648                         FD_SET(fd, &fds);
00649                         tv.tv_sec = select_timeout;
00650                         tv.tv_usec = 0;
00651                         if (!select(fd+1, &fds, NULL, NULL, &tv))
00652                                 check_timeout();
00653                         continue;
00654                 }
00655                 if (cnt != 1)
00656                         break;
00657                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
00658                         /* Skip empty lines if reading locally. */
00659                         if (!reading_remotely && s == fname)
00660                                 continue;
00661                         break;
00662                 }
00663                 if (s < eob)
00664                         *s++ = ch;
00665         }
00666         *s = '\0';
00667 
00668         /* Dump comments. */
00669         if (*fname == '#' || *fname == ';')
00670                 goto start;
00671 
00672         return s - fname;
00673 }

void io_start_buffering_out ( void   ) 

io.c678 行で定義されています。

参照先 iobuf_outiobuf_out_cntout_of_memory().

参照元 client_run()do_recv()do_server_sender()io_start_multiplex_out()send_file_list().

00679 {
00680         if (iobuf_out)
00681                 return;
00682         if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
00683                 out_of_memory("io_start_buffering_out");
00684         iobuf_out_cnt = 0;
00685 }

void io_start_buffering_in ( void   ) 

io.c690 行で定義されています。

参照先 iobuf_iniobuf_in_sizout_of_memory().

参照元 do_server_recv()do_server_sender()io_start_multiplex_in().

00691 {
00692         if (iobuf_in)
00693                 return;
00694         iobuf_in_siz = 2 * IO_BUFFER_SIZE;
00695         if (!(iobuf_in = new_array(char, iobuf_in_siz)))
00696                 out_of_memory("io_start_buffering_in");
00697 }

void io_end_buffering ( void   ) 

io.c699 行で定義されています。

参照先 io_flush()io_multiplexing_outiobuf_out.

参照元 send_file_list().

00700 {
00701         io_flush(NORMAL_FLUSH);
00702         if (!io_multiplexing_out) {
00703                 free(iobuf_out);
00704                 iobuf_out = NULL;
00705         }
00706 }

void maybe_flush_socket ( void   ) 

io.c708 行で定義されています。

参照先 io_flush()iobuf_outiobuf_out_cntlast_io_out.

参照元 generate_files().

00709 {
00710         if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
00711                 io_flush(NORMAL_FLUSH);
00712 }

void maybe_send_keepalive ( void   ) 

io.c714 行で定義されています。

参照先 allowed_lullfile_list::countio_flush()iobuf_outiobuf_out_cntlast_io_outprotocol_versionsock_f_outthe_file_listwrite_int()write_shortint().

参照元 delete_in_dir()generate_files()receive_sums().

00715 {
00716         if (time(NULL) - last_io_out >= allowed_lull) {
00717                 if (!iobuf_out || !iobuf_out_cnt) {
00718                         if (protocol_version < 29)
00719                                 return; /* there's nothing we can do */
00720                         write_int(sock_f_out, the_file_list->count);
00721                         write_shortint(sock_f_out, ITEM_IS_NEW);
00722                 }
00723                 if (iobuf_out)
00724                         io_flush(NORMAL_FLUSH);
00725         }
00726 }

static int readfd_unbuffered ( int  fd,
char *  buf,
size_t  len 
) [static]

Read from the file descriptor handling multiplexing - return number of bytes read.

Never returns <= 0.

io.c748 行で定義されています。

参照先 FERRORio_flush()io_multiplexing_iniobuf_iniobuf_in_sizlog_delete()MSG_DATAMSG_DELETEDMSG_ERRORMSG_INFOMSG_SUCCESSout_of_memory()read_loop()read_timeout()rprintf()rwrite()sock_f_insuccessful_send()who_am_i().

参照元 readfd()writefd_unbuffered().

00749 {
00750         static size_t remaining;
00751         static size_t iobuf_in_ndx;
00752         size_t msg_bytes;
00753         int tag, cnt = 0;
00754         char line[BIGPATHBUFLEN];
00755 
00756         if (!iobuf_in || fd != sock_f_in)
00757                 return read_timeout(fd, buf, len);
00758 
00759         if (!io_multiplexing_in && remaining == 0) {
00760                 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
00761                 iobuf_in_ndx = 0;
00762         }
00763 
00764         while (cnt == 0) {
00765                 if (remaining) {
00766                         len = MIN(len, remaining);
00767                         memcpy(buf, iobuf_in + iobuf_in_ndx, len);
00768                         iobuf_in_ndx += len;
00769                         remaining -= len;
00770                         cnt = len;
00771                         break;
00772                 }
00773 
00774                 read_loop(fd, line, 4);
00775                 tag = IVAL(line, 0);
00776 
00777                 msg_bytes = tag & 0xFFFFFF;
00778                 tag = (tag >> 24) - MPLEX_BASE;
00779 
00780                 switch (tag) {
00781                 case MSG_DATA:
00782                         if (msg_bytes > iobuf_in_siz) {
00783                                 if (!(iobuf_in = realloc_array(iobuf_in, char,
00784                                                                msg_bytes)))
00785                                         out_of_memory("readfd_unbuffered");
00786                                 iobuf_in_siz = msg_bytes;
00787                         }
00788                         read_loop(fd, iobuf_in, msg_bytes);
00789                         remaining = msg_bytes;
00790                         iobuf_in_ndx = 0;
00791                         break;
00792                 case MSG_DELETED:
00793                         if (msg_bytes >= sizeof line)
00794                                 goto overflow;
00795                         read_loop(fd, line, msg_bytes);
00796                         /* A directory name was sent with the trailing null */
00797                         if (msg_bytes > 0 && !line[msg_bytes-1])
00798                                 log_delete(line, S_IFDIR);
00799                         else {
00800                                 line[msg_bytes] = '\0';
00801                                 log_delete(line, S_IFREG);
00802                         }
00803                         break;
00804                 case MSG_SUCCESS:
00805                         if (msg_bytes != 4) {
00806                                 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
00807                                         tag, (long)msg_bytes, who_am_i());
00808                                 exit_cleanup(RERR_STREAMIO);
00809                         }
00810                         read_loop(fd, line, msg_bytes);
00811                         successful_send(IVAL(line, 0));
00812                         break;
00813                 case MSG_INFO:
00814                 case MSG_ERROR:
00815                         if (msg_bytes >= sizeof line) {
00816                             overflow:
00817                                 rprintf(FERROR,
00818                                         "multiplexing overflow %d:%ld [%s]\n",
00819                                         tag, (long)msg_bytes, who_am_i());
00820                                 exit_cleanup(RERR_STREAMIO);
00821                         }
00822                         read_loop(fd, line, msg_bytes);
00823                         rwrite((enum logcode)tag, line, msg_bytes);
00824                         break;
00825                 default:
00826                         rprintf(FERROR, "unexpected tag %d [%s]\n",
00827                                 tag, who_am_i());
00828                         exit_cleanup(RERR_STREAMIO);
00829                 }
00830         }
00831 
00832         if (remaining == 0)
00833                 io_flush(NORMAL_FLUSH);
00834 
00835         return cnt;
00836 }

static void readfd ( int  fd,
char *  buffer,
size_t  N 
) [static]

Do a buffered read from fd.

Don't return until all n bytes have been read. If all n can't be read then exit with an error.

io.c843 行で定義されています。

参照先 batch_fdreadfd_unbuffered()sock_f_instatsstats::total_readwrite_batch_monitor_in.

参照元 read_buf()read_byte()read_int()read_longint()read_sbuf()read_shortint()read_vstring().

00844 {
00845         int  cnt;
00846         size_t total = 0;
00847 
00848         while (total < N) {
00849                 cnt = readfd_unbuffered(fd, buffer + total, N-total);
00850                 total += cnt;
00851         }
00852 
00853         if (fd == write_batch_monitor_in) {
00854                 if ((size_t)write(batch_fd, buffer, total) != total)
00855                         exit_cleanup(RERR_FILEIO);
00856         }
00857 
00858         if (fd == sock_f_in)
00859                 stats.total_read += total;
00860 }

int read_shortint ( int  f  ) 

io.c862 行で定義されています。

参照先 readfd().

参照元 do_recv()read_final_goodbye()read_item_attrs().

00863 {
00864         uchar b[2];
00865         readfd(f, (char *)b, 2);
00866         return (b[1] << 8) + b[0];
00867 }

int32 read_int ( int  f  ) 

io.c869 行で定義されています。

参照先 readfd().

参照元 do_recv()get_next_gen_i()read_final_goodbye()read_longint()read_stream_flags()read_sum_head()receive_acl()receive_file_entry()receive_rsync_acl()receive_sums()receive_xattr()recv_deflated_token()recv_file_list()recv_files()recv_filter_list()recv_uid_list()send_files()setup_protocol()simple_recv_token().

00870 {
00871         char b[4];
00872         int32 num;
00873 
00874         readfd(f,b,4);
00875         num = IVAL(b,0);
00876         if (num == (int32)0xffffffff)
00877                 return -1;
00878         return num;
00879 }

int64 read_longint ( int  f  ) 

io.c881 行で定義されています。

参照先 FERRORread_int()readfd()rprintf().

参照元 handle_stats()receive_file_entry().

00882 {
00883         int64 num;
00884         char b[8];
00885         num = read_int(f);
00886 
00887         if ((int32)num != (int32)0xffffffff)
00888                 return num;
00889 
00890 #if SIZEOF_INT64 < 8
00891         rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
00892         exit_cleanup(RERR_UNSUPPORTED);
00893 #else
00894         readfd(f,b,8);
00895         num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
00896 #endif
00897 
00898         return num;
00899 }

void read_buf ( int  f,
char *  buf,
size_t  len 
)

io.c901 行で定義されています。

参照先 readfd().

参照元 read_line()receive_data()receive_file_entry()receive_sums()receive_xattr()recv_deflated_token()simple_recv_token().

00902 {
00903         readfd(f,buf,len);
00904 }

void read_sbuf ( int  f,
char *  buf,
size_t  len 
)

io.c906 行で定義されています。

参照先 readfd().

参照元 receive_file_entry()recv_filter_list()recv_uid_list().

00907 {
00908         readfd(f, buf, len);
00909         buf[len] = '\0';
00910 }

uchar read_byte ( int  f  ) 

io.c912 行で定義されています。

参照先 readfd().

参照元 read_item_attrs()read_vstring()receive_acl()receive_file_entry()receive_rsync_acl()receive_xattr()recv_deflated_token()recv_file_list()recv_uid_list().

00913 {
00914         uchar c;
00915         readfd(f, (char *)&c, 1);
00916         return c;
00917 }

int read_vstring ( int  f,
char *  buf,
int  bufsize 
)

io.c919 行で定義されています。

参照先 FERRORread_byte()readfd()rprintf().

参照元 read_item_attrs().

00920 {
00921         int len = read_byte(f);
00922 
00923         if (len & 0x80)
00924                 len = (len & ~0x80) * 0x100 + read_byte(f);
00925 
00926         if (len >= bufsize) {
00927                 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
00928                         len, bufsize - 1);
00929                 return -1;
00930         }
00931 
00932         if (len)
00933                 readfd(f, buf, len);
00934         buf[len] = '\0';
00935         return len;
00936 }

void read_sum_head ( int  f,
struct sum_struct sum 
)

io.c940 行で定義されています。

参照先 sum_struct::blengthsum_struct::countcsum_lengthFERRORprotocol_versionread_int()sum_struct::remainderrprintf()sum_struct::s2lengthwho_am_i().

参照元 receive_data()receive_sums().

00941 {
00942         sum->count = read_int(f);
00943         if (sum->count < 0) {
00944                 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
00945                         (long)sum->count, who_am_i());
00946                 exit_cleanup(RERR_PROTOCOL);
00947         }
00948         sum->blength = read_int(f);
00949         if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
00950                 rprintf(FERROR, "Invalid block length %ld [%s]\n",
00951                         (long)sum->blength, who_am_i());
00952                 exit_cleanup(RERR_PROTOCOL);
00953         }
00954         sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
00955         if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
00956                 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
00957                         sum->s2length, who_am_i());
00958                 exit_cleanup(RERR_PROTOCOL);
00959         }
00960         sum->remainder = read_int(f);
00961         if (sum->remainder < 0 || sum->remainder > sum->blength) {
00962                 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
00963                         (long)sum->remainder, who_am_i());
00964                 exit_cleanup(RERR_PROTOCOL);
00965         }
00966 }

void write_sum_head ( int  f,
struct sum_struct sum 
)

io.c971 行で定義されています。

参照先 sum_struct::blengthsum_struct::countprotocol_versionsum_struct::remaindersum_struct::s2lengthwrite_int().

参照元 generate_and_send_sums()recv_generator()send_files().

00972 {
00973         static struct sum_struct null_sum;
00974 
00975         if (sum == NULL)
00976                 sum = &null_sum;
00977 
00978         write_int(f, sum->count);
00979         write_int(f, sum->blength);
00980         if (protocol_version >= 27)
00981                 write_int(f, sum->s2length);
00982         write_int(f, sum->remainder);
00983 }

static void sleep_for_bwlimit ( int  bytes_written  )  [static]

Sleep after writing to limit I/O bandwidth usage.

TODO:
Rather than sleeping after each write, it might be better to use some kind of averaging. The current algorithm seems to always use a bit less bandwidth than specified, because it doesn't make up for slow periods. But arguably this is a feature. In addition, we ought to take the time used to write the data into account.
During some phases of big transfers (file FOO is uptodate) this is called with a small bytes_written every time. As the kernel has to round small waits up to guarantee that we actually wait at least the requested number of microseconds, this can become grossly inaccurate. We therefore keep track of the bytes we've written over time and only sleep when the accumulated delay is at least 1 tenth of a second.

io.c1001 行で定義されています。

参照先 bwlimitbwlimit_writemaxtotal_written.

参照元 writefd_unbuffered().

01002 {
01003         static struct timeval prior_tv;
01004         static long total_written = 0; 
01005         struct timeval tv, start_tv;
01006         long elapsed_usec, sleep_usec;
01007 
01008 #define ONE_SEC 1000000L /* # of microseconds in a second */
01009 
01010         if (!bwlimit_writemax)
01011                 return;
01012 
01013         total_written += bytes_written; 
01014 
01015         gettimeofday(&start_tv, NULL);
01016         if (prior_tv.tv_sec) {
01017                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
01018                              + (start_tv.tv_usec - prior_tv.tv_usec);
01019                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
01020                 if (total_written < 0)
01021                         total_written = 0;
01022         }
01023 
01024         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
01025         if (sleep_usec < ONE_SEC / 10) {
01026                 prior_tv = start_tv;
01027                 return;
01028         }
01029 
01030         tv.tv_sec  = sleep_usec / ONE_SEC;
01031         tv.tv_usec = sleep_usec % ONE_SEC;
01032         select(0, NULL, NULL, NULL, &tv);
01033 
01034         gettimeofday(&prior_tv, NULL);
01035         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
01036                      + (prior_tv.tv_usec - start_tv.tv_usec);
01037         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
01038 }

static void writefd_unbuffered ( int  fd,
char *  buf,
size_t  len 
) [static]

io.c1046 行で定義されています。

参照先 am_generatorbwlimit_writemaxcheck_timeout()close_multiplexing_out()defer_forwarding_messageserrnoFERRORignore_timeoutio_filesfrom_bufio_multiplexing_inio_timeoutlast_io_outmsg_fd_inmsleep()no_flushread_msg_fd()readfd_unbuffered()rsyserr()select_timeoutset_io_timeout()sleep_for_bwlimit()sock_f_insock_f_outwho_am_i().

参照元 io_flush()mplex_write()msg2sndr_flush()writefd().

01047 {
01048         size_t n, total = 0;
01049         fd_set w_fds, r_fds;
01050         int maxfd, count, cnt, using_r_fds;
01051         int defer_save = defer_forwarding_messages;
01052         struct timeval tv;
01053 
01054         no_flush++;
01055 
01056         while (total < len) {
01057                 FD_ZERO(&w_fds);
01058                 FD_SET(fd,&w_fds);
01059                 maxfd = fd;
01060 
01061                 if (msg_fd_in >= 0) {
01062                         FD_ZERO(&r_fds);
01063                         FD_SET(msg_fd_in,&r_fds);
01064                         if (msg_fd_in > maxfd)
01065                                 maxfd = msg_fd_in;
01066                         using_r_fds = 1;
01067                 } else
01068                         using_r_fds = 0;
01069 
01070                 tv.tv_sec = select_timeout;
01071                 tv.tv_usec = 0;
01072 
01073                 errno = 0;
01074                 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
01075                                &w_fds, NULL, &tv);
01076 
01077                 if (count <= 0) {
01078                         if (count < 0 && errno == EBADF)
01079                                 exit_cleanup(RERR_SOCKETIO);
01080                         check_timeout();
01081                         continue;
01082                 }
01083 
01084                 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
01085                         read_msg_fd();
01086 
01087                 if (!FD_ISSET(fd, &w_fds))
01088                         continue;
01089 
01090                 n = len - total;
01091                 if (bwlimit_writemax && n > bwlimit_writemax)
01092                         n = bwlimit_writemax;
01093                 cnt = write(fd, buf + total, n);
01094 
01095                 if (cnt <= 0) {
01096                         if (cnt < 0) {
01097                                 if (errno == EINTR)
01098                                         continue;
01099                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
01100                                         msleep(1);
01101                                         continue;
01102                                 }
01103                         }
01104 
01105                         /* Don't try to write errors back across the stream. */
01106                         if (fd == sock_f_out)
01107                                 close_multiplexing_out();
01108                         rsyserr(FERROR, errno,
01109                                 "writefd_unbuffered failed to write %ld bytes [%s]",
01110                                 (long)len, who_am_i());
01111                         /* If the other side is sending us error messages, try
01112                          * to grab any messages they sent before they died. */
01113                         while (fd == sock_f_out && io_multiplexing_in) {
01114                                 set_io_timeout(30);
01115                                 ignore_timeout = 0;
01116                                 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
01117                                                   sizeof io_filesfrom_buf);
01118                         }
01119                         exit_cleanup(RERR_STREAMIO);
01120                 }
01121 
01122                 total += cnt;
01123                 defer_forwarding_messages = 1;
01124 
01125                 if (fd == sock_f_out) {
01126                         if (io_timeout || am_generator)
01127                                 last_io_out = time(NULL);
01128                         sleep_for_bwlimit(cnt);
01129                 }
01130         }
01131 
01132         defer_forwarding_messages = defer_save;
01133         no_flush--;
01134 }

static void msg2sndr_flush ( void   )  [static]

io.c1136 行で定義されています。

参照先 defer_forwarding_messagesmsg_list::headio_multiplexing_outmmsg2sndrsock_f_outstatsmsg_list::tailstats::total_writtenwritefd_unbuffered().

参照元 io_flush()mplex_write().

01137 {
01138         if (defer_forwarding_messages)
01139                 return;
01140 
01141         while (msg2sndr.head && io_multiplexing_out) {
01142                 struct msg_list_item *m = msg2sndr.head;
01143                 if (!(msg2sndr.head = m->next))
01144                         msg2sndr.tail = NULL;
01145                 stats.total_written += m->len;
01146                 defer_forwarding_messages = 1;
01147                 writefd_unbuffered(sock_f_out, m->buf, m->len);
01148                 defer_forwarding_messages = 0;
01149                 free(m);
01150         }
01151 }

static void mplex_write ( enum msgcode  code,
char *  buf,
size_t  len 
) [static]

Write an message to a multiplexed stream.

If this fails then rsync exits.

io.c1157 行で定義されています。

参照先 defer_forwarding_messagesmsg2sndr_flush()sock_f_outwritefd_unbuffered().

参照元 io_flush()io_multiplex_write().

01158 {
01159         char buffer[1024];
01160         size_t n = len;
01161 
01162         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
01163 
01164         if (n > sizeof buffer - 4)
01165                 n = 0;
01166         else
01167                 memcpy(buffer + 4, buf, n);
01168 
01169         writefd_unbuffered(sock_f_out, buffer, n+4);
01170 
01171         len -= n;
01172         buf += n;
01173 
01174         if (len) {
01175                 defer_forwarding_messages = 1;
01176                 writefd_unbuffered(sock_f_out, buf, len);
01177                 defer_forwarding_messages = 0;
01178                 msg2sndr_flush();
01179         }
01180 }

void io_flush ( int  flush_it_all  ) 

io.c1182 行で定義されています。

参照先 io_multiplexing_outiobuf_outiobuf_out_cntmplex_write()msg2genr_flush()msg2sndr_flush()MSG_DATAno_flushsock_f_outwritefd_unbuffered().

参照元 _exit_cleanup()client_run()do_recv()do_server_sender()io_end_buffering()io_multiplex_write()io_start_multiplex_in()io_start_multiplex_out()maybe_flush_socket()maybe_send_keepalive()read_timeout()readfd_unbuffered()wait_process_with_flush()writefd().

01183 {
01184         msg2genr_flush(flush_it_all);
01185         msg2sndr_flush();
01186 
01187         if (!iobuf_out_cnt || no_flush)
01188                 return;
01189 
01190         if (io_multiplexing_out)
01191                 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
01192         else
01193                 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
01194         iobuf_out_cnt = 0;
01195 }

static void writefd ( int  fd,
char *  buf,
size_t  len 
) [static]

io.c1197 行で定義されています。

参照先 batch_fdFERRORio_flush()iobuf_outiobuf_out_cntmsg_fd_outrprintf()sock_f_outstatsstats::total_writtenwrite_batch_monitor_outwritefd_unbuffered().

参照元 write_buf()write_byte()write_int()write_longint()write_sbuf()write_shortint()write_vstring().

01198 {
01199         if (fd == msg_fd_out) {
01200                 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
01201                 exit_cleanup(RERR_PROTOCOL);
01202         }
01203 
01204         if (fd == sock_f_out)
01205                 stats.total_written += len;
01206 
01207         if (fd == write_batch_monitor_out) {
01208                 if ((size_t)write(batch_fd, buf, len) != len)
01209                         exit_cleanup(RERR_FILEIO);
01210         }
01211 
01212         if (!iobuf_out || fd != sock_f_out) {
01213                 writefd_unbuffered(fd, buf, len);
01214                 return;
01215         }
01216 
01217         while (len) {
01218                 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
01219                 if (n > 0) {
01220                         memcpy(iobuf_out+iobuf_out_cnt, buf, n);
01221                         buf += n;
01222                         len -= n;
01223                         iobuf_out_cnt += n;
01224                 }
01225 
01226                 if (iobuf_out_cnt == IO_BUFFER_SIZE)
01227                         io_flush(NORMAL_FLUSH);
01228         }
01229 }

void write_shortint ( int  f,
int  x 
)

io.c1231 行で定義されています。

参照先 writefd().

参照元 itemize()maybe_send_keepalive()read_final_goodbye()write_ndx_and_attrs().

01232 {
01233         uchar b[2];
01234         b[0] = x;
01235         b[1] = x >> 8;
01236         writefd(f, (char *)b, 2);
01237 }

void write_int ( int  f,
int32  x 
)

io.c1239 行で定義されています。

参照先 writefd().

参照元 do_recv()generate_and_send_sums()generate_files()itemize()maybe_send_keepalive()read_final_goodbye()recv_generator()send_acl()send_deflated_token()send_file_entry()send_file_list()send_files()send_filter_list()send_ida_list()send_rsync_acl()send_rules()send_uid_list()send_xattr()setup_protocol()simple_send_token()start_write_batch()write_longint()write_ndx_and_attrs()write_stream_flags()write_sum_head().

01240 {
01241         char b[4];
01242         SIVAL(b,0,x);
01243         writefd(f,b,4);
01244 }

void write_longint ( int  f,
int64  x 
)

io.c1250 行で定義されています。

参照先 FERRORrprintf()write_int()writefd().

参照元 handle_stats()send_file_entry().

01251 {
01252         char b[8];
01253 
01254         if (x <= 0x7FFFFFFF) {
01255                 write_int(f, (int)x);
01256                 return;
01257         }
01258 
01259 #if SIZEOF_INT64 < 8
01260         rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
01261         exit_cleanup(RERR_UNSUPPORTED);
01262 #else
01263         write_int(f, (int32)0xFFFFFFFF);
01264         SIVAL(b,0,(x&0xFFFFFFFF));
01265         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
01266 
01267         writefd(f,b,8);
01268 #endif
01269 }

void write_buf ( int  f,
char *  buf,
size_t  len 
)

io.c1271 行で定義されています。

参照先 writefd().

参照元 finish_pre_exec()generate_and_send_sums()send_deflated_token()send_file_entry()send_rules()send_uid_list()send_xattr()simple_send_token()write_filter_rules().

01272 {
01273         writefd(f,buf,len);
01274 }

void write_sbuf ( int  f,
char *  buf 
)

Write a string to the connection

io.c1277 行で定義されています。

参照先 writefd().

参照元 io_printf()write_batch_shell_file()write_filter_rules().

01278 {
01279         writefd(f, buf, strlen(buf));
01280 }

void write_byte ( int  f,
uchar  c 
)

io.c1282 行で定義されています。

参照先 writefd().

参照元 finish_pre_exec()itemize()rsync_module()send_acl()send_deflated_token()send_file_entry()send_ida_list()send_rsync_acl()send_rules()send_uid_list()send_xattr()write_batch_shell_file()write_filter_rules()write_ndx_and_attrs().

01283 {
01284         writefd(f, (char *)&c, 1);
01285 }

void write_vstring ( int  f,
char *  str,
int  len 
)

io.c1287 行で定義されています。

参照先 FERRORrprintf()writefd().

参照元 itemize()write_ndx_and_attrs().

01288 {
01289         uchar lenbuf[3], *lb = lenbuf;
01290 
01291         if (len > 0x7F) {
01292                 if (len > 0x7FFF) {
01293                         rprintf(FERROR,
01294                                 "attempting to send over-long vstring (%d > %d)\n",
01295                                 len, 0x7FFF);
01296                         exit_cleanup(RERR_PROTOCOL);
01297                 }
01298                 *lb++ = len / 0x100 + 0x80;
01299         }
01300         *lb = len;
01301 
01302         writefd(f, (char*)lenbuf, lb - lenbuf + 1);
01303         if (len)
01304                 writefd(f, str, len);
01305 }

int read_line ( int  f,
char *  buf,
size_t  maxlen 
)

Read a line of up to maxlen characters into buf (not counting the trailing null).

Strips the (required) trailing newline and all carriage returns.

戻り値:
1 for success; 0 for I/O error or truncation.

io.c1314 行で定義されています。

参照先 read_buf().

参照元 auth_server()rsync_module()start_daemon()start_inband_exchange().

01315 {
01316         while (maxlen) {
01317                 buf[0] = 0;
01318                 read_buf(f, buf, 1);
01319                 if (buf[0] == 0)
01320                         return 0;
01321                 if (buf[0] == '\n')
01322                         break;
01323                 if (buf[0] != '\r') {
01324                         buf++;
01325                         maxlen--;
01326                 }
01327         }
01328         *buf = '\0';
01329         return maxlen > 0;
01330 }

void io_printf ( int  fd,
const char *  format,
  ... 
)

io.c1332 行で定義されています。

参照先 bufFERRORmsg_list_item::lenrprintf()vsnprintf()write_sbuf().

参照元 auth_client()auth_server()rsync_module()send_listing()start_daemon()start_inband_exchange().

01333 {
01334         va_list ap;
01335         char buf[BIGPATHBUFLEN];
01336         int len;
01337 
01338         va_start(ap, format);
01339         len = vsnprintf(buf, sizeof buf, format, ap);
01340         va_end(ap);
01341 
01342         if (len < 0)
01343                 exit_cleanup(RERR_STREAMIO);
01344 
01345         if (len > (int)sizeof buf) {
01346                 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
01347                 exit_cleanup(RERR_STREAMIO);
01348         }
01349 
01350         write_sbuf(fd, buf);
01351 }

void io_start_multiplex_out ( void   ) 

Setup for multiplexing a MSG_* stream with the data stream.

io.c1354 行で定義されています。

参照先 io_flush()io_multiplexing_outio_start_buffering_out().

参照元 client_run()rsync_module()start_server().

01355 {
01356         io_flush(NORMAL_FLUSH);
01357         io_start_buffering_out();
01358         io_multiplexing_out = 1;
01359 }

void io_start_multiplex_in ( void   ) 

Setup for multiplexing a MSG_* stream with the data stream.

io.c1362 行で定義されています。

参照先 io_flush()io_multiplexing_inio_start_buffering_in().

参照元 client_run()start_inband_exchange()start_server().

01363 {
01364         io_flush(NORMAL_FLUSH);
01365         io_start_buffering_in();
01366         io_multiplexing_in = 1;
01367 }

int io_multiplex_write ( enum msgcode  code,
char *  buf,
size_t  len 
)

Write an message to the multiplexed data stream.

io.c1370 行で定義されています。

参照先 io_flush()io_multiplexing_outmplex_write()statsstats::total_written.

参照元 read_msg_fd()rwrite()send_msg().

01371 {
01372         if (!io_multiplexing_out)
01373                 return 0;
01374 
01375         io_flush(NORMAL_FLUSH);
01376         stats.total_written += (len+4);
01377         mplex_write(code, buf, len);
01378         return 1;
01379 }

void close_multiplexing_in ( void   ) 

io.c1381 行で定義されています。

参照先 io_multiplexing_in.

参照元 do_recv().

01382 {
01383         io_multiplexing_in = 0;
01384 }

void close_multiplexing_out ( void   ) 

Stop output multiplexing.

io.c1387 行で定義されています。

参照先 io_multiplexing_out.

参照元 do_recv()read_msg_fd()read_timeout()writefd_unbuffered().

01388 {
01389         io_multiplexing_out = 0;
01390 }

void start_write_batch ( int  fd  ) 

io.c1392 行で定義されています。

参照先 am_senderbatch_fdchecksum_seedprotocol_versionwrite_batch_monitor_inwrite_batch_monitor_outwrite_int()write_stream_flags().

参照元 client_run().

01393 {
01394         write_stream_flags(batch_fd);
01395 
01396         /* Some communication has already taken place, but we don't
01397          * enable batch writing until here so that we can write a
01398          * canonical record of the communication even though the
01399          * actual communication so far depends on whether a daemon
01400          * is involved. */
01401         write_int(batch_fd, protocol_version);
01402         write_int(batch_fd, checksum_seed);
01403 
01404         if (am_sender)
01405                 write_batch_monitor_out = fd;
01406         else
01407                 write_batch_monitor_in = fd;
01408 }

void stop_write_batch ( void   ) 

io.c1410 行で定義されています。

参照先 write_batch_monitor_inwrite_batch_monitor_out.

参照元 do_recv().

01411 {
01412         write_batch_monitor_out = -1;
01413         write_batch_monitor_in = -1;
01414 }


変数

int bwlimit

options.c103 行で定義されています。

参照元 server_options()sleep_for_bwlimit().

size_t bwlimit_writemax

options.c105 行で定義されています。

参照元 sleep_for_bwlimit()writefd_unbuffered().

int io_timeout

options.c84 行で定義されています。

int allowed_lull

options.c85 行で定義されています。

int am_server

options.c75 行で定義されています。

int am_daemon

options.c94 行で定義されています。

int am_sender

options.c76 行で定義されています。

int am_generator

options.c77 行で定義されています。

参照元 do_recv()handle_stats()read_msg_fd()who_am_i()writefd_unbuffered().

int eol_nulls

options.c90 行で定義されています。

int read_batch

options.c139 行で定義されています。

int csum_length

checksum.c22 行で定義されています。

int checksum_seed

options.c115 行で定義されています。

int protocol_version

mdfour.c209 行で定義されています。

int remove_sent_files

options.c68 行で定義されています。

int preserve_hard_links

options.c47 行で定義されています。

char* filesfrom_host

options.c89 行で定義されています。

参照元 client_run()read_filesfrom_line()server_options()start_client().

struct stats stats

log.c59 行で定義されています。

struct file_list* the_file_list

main.c66 行で定義されています。

const char phase_unknown[] = "unknown"

io.c61 行で定義されています。

int ignore_timeout = 0

io.c62 行で定義されています。

参照元 check_timeout()generate_files()writefd_unbuffered().

int batch_fd = -1

io.c63 行で定義されています。

参照元 do_cmd()handle_stats()main()readfd()send_files()start_write_batch()writefd().

int batch_gen_fd = -1

io.c64 行で定義されています。

参照元 do_cmd()recv_files().

int kluge_around_eof = 0

io.c67 行で定義されています。

int msg_fd_in = -1

io.c69 行で定義されています。

参照元 read_msg_fd()set_msg_fd_in()writefd_unbuffered().

int msg_fd_out = -1

io.c70 行で定義されています。

参照元 msg2genr_flush()read_timeout()rwrite()send_msg()set_msg_fd_out()writefd().

int sock_f_in = -1

io.c71 行で定義されています。

参照元 io_set_sock_fds()read_timeout()readfd()readfd_unbuffered()whine_about_eof()writefd_unbuffered().

int sock_f_out = -1

io.c72 行で定義されています。

参照元 io_flush()io_set_sock_fds()itemize()maybe_send_keepalive()mplex_write()msg2sndr_flush()writefd()writefd_unbuffered().

int io_multiplexing_out [static]

io.c74 行で定義されています。

参照元 close_multiplexing_out()io_end_buffering()io_flush()io_multiplex_write()io_start_multiplex_out()msg2sndr_flush().

int io_multiplexing_in [static]

io.c75 行で定義されています。

参照元 close_multiplexing_in()io_start_multiplex_in()readfd_unbuffered()writefd_unbuffered().

time_t last_io_in [static]

io.c76 行で定義されています。

参照元 check_timeout()read_timeout().

time_t last_io_out [static]

io.c77 行で定義されています。

参照元 maybe_flush_socket()maybe_send_keepalive()writefd_unbuffered().

int no_flush [static]

io.c78 行で定義されています。

参照元 io_flush()writefd_unbuffered().

int write_batch_monitor_in = -1 [static]

io.c80 行で定義されています。

参照元 readfd()start_write_batch()stop_write_batch().

int write_batch_monitor_out = -1 [static]

io.c81 行で定義されています。

参照元 start_write_batch()stop_write_batch()writefd().

int io_filesfrom_f_in = -1 [static]

io.c83 行で定義されています。

参照元 io_set_filesfrom_fds()read_timeout().

int io_filesfrom_f_out = -1 [static]

io.c84 行で定義されています。

参照元 io_set_filesfrom_fds()read_timeout().

char io_filesfrom_buf[2048] [static]

io.c85 行で定義されています。

参照元 io_set_filesfrom_fds()read_timeout()writefd_unbuffered().

char* io_filesfrom_bp [static]

io.c86 行で定義されています。

参照元 io_set_filesfrom_fds()read_timeout().

char io_filesfrom_lastchar [static]

io.c87 行で定義されています。

参照元 io_set_filesfrom_fds()read_timeout().

int io_filesfrom_buflen [static]

io.c88 行で定義されています。

参照元 io_set_filesfrom_fds()read_timeout().

int defer_forwarding_messages = 0 [static]

io.c89 行で定義されています。

参照元 mplex_write()msg2sndr_flush()read_msg_fd()writefd_unbuffered().

int select_timeout = SELECT_TIMEOUT [static]

io.c90 行で定義されています。

参照元 msg2genr_flush()read_filesfrom_line()read_timeout()set_io_timeout()writefd_unbuffered().

int active_filecnt = 0 [static]

io.c91 行で定義されています。

参照元 decrement_active_files()increment_active_files().

OFF_T active_bytecnt = 0 [static]

io.c92 行で定義されています。

参照元 decrement_active_files()increment_active_files().

struct flist_ndx_list redo_list hlink_list [static]

io.c105 行で定義されています。

struct msg_list msg2genr msg2sndr [static]

io.c117 行で定義されています。

参照元 msg2sndr_flush()read_msg_fd().

char* iobuf_out [static]

io.c675 行で定義されています。

参照元 io_end_buffering()io_flush()io_start_buffering_out()maybe_flush_socket()maybe_send_keepalive()writefd().

int iobuf_out_cnt [static]

io.c676 行で定義されています。

参照元 io_flush()io_start_buffering_out()maybe_flush_socket()maybe_send_keepalive()writefd().

char* iobuf_in [static]

io.c687 行で定義されています。

参照元 io_start_buffering_in()readfd_unbuffered().

size_t iobuf_in_siz [static]

io.c688 行で定義されています。

参照元 io_start_buffering_in()readfd_unbuffered().


rsyncに対してSat Dec 5 19:45:44 2009に生成されました。  doxygen 1.4.7