00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "rsync.h"
00038
00039
00040 #define SELECT_TIMEOUT 60
00041
00042 extern int bwlimit;
00043 extern size_t bwlimit_writemax;
00044 extern int io_timeout;
00045 extern int allowed_lull;
00046 extern int am_server;
00047 extern int am_daemon;
00048 extern int am_sender;
00049 extern int am_generator;
00050 extern int eol_nulls;
00051 extern int read_batch;
00052 extern int csum_length;
00053 extern int checksum_seed;
00054 extern int protocol_version;
00055 extern int remove_sent_files;
00056 extern int preserve_hard_links;
00057 extern char *filesfrom_host;
00058 extern struct stats stats;
00059 extern struct file_list *the_file_list;
00060
00061 const char phase_unknown[] = "unknown";
00062 int ignore_timeout = 0;
00063 int batch_fd = -1;
00064 int batch_gen_fd = -1;
00065
00066
00067 int kluge_around_eof = 0;
00068
00069 int msg_fd_in = -1;
00070 int msg_fd_out = -1;
00071 int sock_f_in = -1;
00072 int sock_f_out = -1;
00073
00074 static int io_multiplexing_out;
00075 static int io_multiplexing_in;
00076 static time_t last_io_in;
00077 static time_t last_io_out;
00078 static int no_flush;
00079
00080 static int write_batch_monitor_in = -1;
00081 static int write_batch_monitor_out = -1;
00082
00083 static int io_filesfrom_f_in = -1;
00084 static int io_filesfrom_f_out = -1;
00085 static char io_filesfrom_buf[2048];
00086 static char *io_filesfrom_bp;
00087 static char io_filesfrom_lastchar;
00088 static int io_filesfrom_buflen;
00089 static int defer_forwarding_messages = 0;
00090 static int select_timeout = SELECT_TIMEOUT;
00091 static int active_filecnt = 0;
00092 static OFF_T active_bytecnt = 0;
00093
00094 static void read_loop(int fd, char *buf, size_t len);
00095
00096 struct flist_ndx_item {
00097 struct flist_ndx_item *next;
00098 int ndx;
00099 };
00100
00101 struct flist_ndx_list {
00102 struct flist_ndx_item *head, *tail;
00103 };
00104
00105 static struct flist_ndx_list redo_list, hlink_list;
00106
00107 struct msg_list_item {
00108 struct msg_list_item *next;
00109 int len;
00110 char buf[1];
00111 };
00112
00113 struct msg_list {
00114 struct msg_list_item *head, *tail;
00115 };
00116
00117 static struct msg_list msg2genr, msg2sndr;
00118
00119 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
00120 {
00121 struct flist_ndx_item *item;
00122
00123 if (!(item = new(struct flist_ndx_item)))
00124 out_of_memory("flist_ndx_push");
00125 item->next = NULL;
00126 item->ndx = ndx;
00127 if (lp->tail)
00128 lp->tail->next = item;
00129 else
00130 lp->head = item;
00131 lp->tail = item;
00132 }
00133
00134 static int flist_ndx_pop(struct flist_ndx_list *lp)
00135 {
00136 struct flist_ndx_item *next;
00137 int ndx;
00138
00139 if (!lp->head)
00140 return -1;
00141
00142 ndx = lp->head->ndx;
00143 next = lp->head->next;
00144 free(lp->head);
00145 lp->head = next;
00146 if (!next)
00147 lp->tail = NULL;
00148
00149 return ndx;
00150 }
00151
00152 static void check_timeout(void)
00153 {
00154 time_t t;
00155
00156 if (!io_timeout || ignore_timeout)
00157 return;
00158
00159 if (!last_io_in) {
00160 last_io_in = time(NULL);
00161 return;
00162 }
00163
00164 t = time(NULL);
00165
00166 if (t - last_io_in >= io_timeout) {
00167 if (!am_server && !am_daemon) {
00168 rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
00169 (int)(t-last_io_in));
00170 }
00171 exit_cleanup(RERR_TIMEOUT);
00172 }
00173 }
00174
00175
00176
00177 void io_set_sock_fds(int f_in, int f_out)
00178 {
00179 sock_f_in = f_in;
00180 sock_f_out = f_out;
00181 }
00182
00183 void set_io_timeout(int secs)
00184 {
00185 io_timeout = secs;
00186
00187 if (!io_timeout || io_timeout > SELECT_TIMEOUT)
00188 select_timeout = SELECT_TIMEOUT;
00189 else
00190 select_timeout = io_timeout;
00191
00192 allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
00193 }
00194
00195
00196
00197
00198
00199 void set_msg_fd_in(int fd)
00200 {
00201 msg_fd_in = fd;
00202 }
00203
00204
00205
00206 void set_msg_fd_out(int fd)
00207 {
00208 msg_fd_out = fd;
00209 set_nonblocking(msg_fd_out);
00210 }
00211
00212
00213 static void msg_list_add(struct msg_list *lst, int code, char *buf, int len)
00214 {
00215 struct msg_list_item *m;
00216 int sz = len + 4 + sizeof m[0] - 1;
00217
00218 if (!(m = (struct msg_list_item *)new_array(char, sz)))
00219 out_of_memory("msg_list_add");
00220 m->next = NULL;
00221 m->len = len + 4;
00222 SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
00223 memcpy(m->buf + 4, buf, len);
00224 if (lst->tail)
00225 lst->tail->next = m;
00226 else
00227 lst->head = m;
00228 lst->tail = m;
00229 }
00230
00231
00232
00233
00234
00235 static void read_msg_fd(void)
00236 {
00237 char buf[2048];
00238 size_t n;
00239 int fd = msg_fd_in;
00240 int tag, len;
00241
00242
00243
00244 msg_fd_in = -1;
00245
00246 read_loop(fd, buf, 4);
00247 tag = IVAL(buf, 0);
00248
00249 len = tag & 0xFFFFFF;
00250 tag = (tag >> 24) - MPLEX_BASE;
00251
00252 switch (tag) {
00253 case MSG_DONE:
00254 if (len != 0 || !am_generator) {
00255 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00256 exit_cleanup(RERR_STREAMIO);
00257 }
00258 flist_ndx_push(&redo_list, -1);
00259 break;
00260 case MSG_REDO:
00261 if (len != 4 || !am_generator) {
00262 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00263 exit_cleanup(RERR_STREAMIO);
00264 }
00265 read_loop(fd, buf, 4);
00266 if (remove_sent_files)
00267 decrement_active_files(IVAL(buf,0));
00268 flist_ndx_push(&redo_list, IVAL(buf,0));
00269 break;
00270 case MSG_DELETED:
00271 if (len >= (int)sizeof buf || !am_generator) {
00272 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00273 exit_cleanup(RERR_STREAMIO);
00274 }
00275 read_loop(fd, buf, len);
00276 if (defer_forwarding_messages)
00277 msg_list_add(&msg2sndr, MSG_DELETED, buf, len);
00278 else
00279 io_multiplex_write(MSG_DELETED, buf, len);
00280 break;
00281 case MSG_SUCCESS:
00282 if (len != 4 || !am_generator) {
00283 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00284 exit_cleanup(RERR_STREAMIO);
00285 }
00286 read_loop(fd, buf, len);
00287 if (remove_sent_files) {
00288 decrement_active_files(IVAL(buf,0));
00289 if (defer_forwarding_messages)
00290 msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len);
00291 else
00292 io_multiplex_write(MSG_SUCCESS, buf, len);
00293 }
00294 if (preserve_hard_links)
00295 flist_ndx_push(&hlink_list, IVAL(buf,0));
00296 break;
00297 case MSG_SOCKERR:
00298 if (!am_generator) {
00299 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
00300 exit_cleanup(RERR_STREAMIO);
00301 }
00302 close_multiplexing_out();
00303
00304 case MSG_INFO:
00305 case MSG_ERROR:
00306 case MSG_LOG:
00307 while (len) {
00308 n = len;
00309 if (n >= sizeof buf)
00310 n = sizeof buf - 1;
00311 read_loop(fd, buf, n);
00312 if (am_generator && am_server && defer_forwarding_messages)
00313 msg_list_add(&msg2sndr, tag, buf, n);
00314 else
00315 rwrite((enum logcode)tag, buf, n);
00316 len -= n;
00317 }
00318 break;
00319 default:
00320 rprintf(FERROR, "unknown message %d:%d [%s]\n",
00321 tag, len, who_am_i());
00322 exit_cleanup(RERR_STREAMIO);
00323 }
00324
00325 msg_fd_in = fd;
00326 }
00327
00328
00329
00330
00331 void increment_active_files(int ndx, int itemizing, enum logcode code)
00332 {
00333
00334 while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
00335 if (hlink_list.head)
00336 check_for_finished_hlinks(itemizing, code);
00337 read_msg_fd();
00338 }
00339
00340 active_filecnt++;
00341 active_bytecnt += the_file_list->files[ndx]->length;
00342 }
00343
00344 void decrement_active_files(int ndx)
00345 {
00346 active_filecnt--;
00347 active_bytecnt -= the_file_list->files[ndx]->length;
00348 }
00349
00350
00351
00352
00353 static int msg2genr_flush(int flush_it_all)
00354 {
00355 static int written = 0;
00356 struct timeval tv;
00357 fd_set fds;
00358
00359 if (msg_fd_out < 0)
00360 return -1;
00361
00362 while (msg2genr.head) {
00363 struct msg_list_item *m = msg2genr.head;
00364 int n = write(msg_fd_out, m->buf + written, m->len - written);
00365 if (n < 0) {
00366 if (errno == EINTR)
00367 continue;
00368 if (errno != EWOULDBLOCK && errno != EAGAIN)
00369 return -1;
00370 if (!flush_it_all)
00371 return 0;
00372 FD_ZERO(&fds);
00373 FD_SET(msg_fd_out, &fds);
00374 tv.tv_sec = select_timeout;
00375 tv.tv_usec = 0;
00376 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
00377 check_timeout();
00378 } else if ((written += n) == m->len) {
00379 msg2genr.head = m->next;
00380 if (!msg2genr.head)
00381 msg2genr.tail = NULL;
00382 free(m);
00383 written = 0;
00384 }
00385 }
00386 return 1;
00387 }
00388
00389 void send_msg(enum msgcode code, char *buf, int len)
00390 {
00391 if (msg_fd_out < 0) {
00392 io_multiplex_write(code, buf, len);
00393 return;
00394 }
00395 msg_list_add(&msg2genr, code, buf, len);
00396 msg2genr_flush(NORMAL_FLUSH);
00397 }
00398
00399 int get_redo_num(int itemizing, enum logcode code)
00400 {
00401 while (1) {
00402 if (hlink_list.head)
00403 check_for_finished_hlinks(itemizing, code);
00404 if (redo_list.head)
00405 break;
00406 read_msg_fd();
00407 }
00408
00409 return flist_ndx_pop(&redo_list);
00410 }
00411
00412 int get_hlink_num(void)
00413 {
00414 return flist_ndx_pop(&hlink_list);
00415 }
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 void io_set_filesfrom_fds(int f_in, int f_out)
00428 {
00429 io_filesfrom_f_in = f_in;
00430 io_filesfrom_f_out = f_out;
00431 io_filesfrom_bp = io_filesfrom_buf;
00432 io_filesfrom_lastchar = '\0';
00433 io_filesfrom_buflen = 0;
00434 }
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448 static void whine_about_eof(int fd)
00449 {
00450 if (kluge_around_eof && fd == sock_f_in) {
00451 int i;
00452 if (kluge_around_eof > 0)
00453 exit_cleanup(0);
00454
00455 for (i = 10*1000/20; i--; )
00456 msleep(20);
00457 }
00458
00459 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
00460 "(%.0f bytes received so far) [%s]\n",
00461 (double)stats.total_read, who_am_i());
00462
00463 exit_cleanup(RERR_STREAMIO);
00464 }
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477 static int read_timeout(int fd, char *buf, size_t len)
00478 {
00479 int n, cnt = 0;
00480
00481 io_flush(NORMAL_FLUSH);
00482
00483 while (cnt == 0) {
00484
00485 fd_set r_fds, w_fds;
00486 struct timeval tv;
00487 int maxfd = fd;
00488 int count;
00489
00490 FD_ZERO(&r_fds);
00491 FD_ZERO(&w_fds);
00492 FD_SET(fd, &r_fds);
00493 if (msg2genr.head) {
00494 FD_SET(msg_fd_out, &w_fds);
00495 if (msg_fd_out > maxfd)
00496 maxfd = msg_fd_out;
00497 }
00498 if (io_filesfrom_f_out >= 0) {
00499 int new_fd;
00500 if (io_filesfrom_buflen == 0) {
00501 if (io_filesfrom_f_in >= 0) {
00502 FD_SET(io_filesfrom_f_in, &r_fds);
00503 new_fd = io_filesfrom_f_in;
00504 } else {
00505 io_filesfrom_f_out = -1;
00506 new_fd = -1;
00507 }
00508 } else {
00509 FD_SET(io_filesfrom_f_out, &w_fds);
00510 new_fd = io_filesfrom_f_out;
00511 }
00512 if (new_fd > maxfd)
00513 maxfd = new_fd;
00514 }
00515
00516 tv.tv_sec = select_timeout;
00517 tv.tv_usec = 0;
00518
00519 errno = 0;
00520
00521 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
00522
00523 if (count <= 0) {
00524 if (errno == EBADF)
00525 exit_cleanup(RERR_SOCKETIO);
00526 check_timeout();
00527 continue;
00528 }
00529
00530 if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
00531 msg2genr_flush(NORMAL_FLUSH);
00532
00533 if (io_filesfrom_f_out >= 0) {
00534 if (io_filesfrom_buflen) {
00535 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
00536 int l = write(io_filesfrom_f_out,
00537 io_filesfrom_bp,
00538 io_filesfrom_buflen);
00539 if (l > 0) {
00540 if (!(io_filesfrom_buflen -= l))
00541 io_filesfrom_bp = io_filesfrom_buf;
00542 else
00543 io_filesfrom_bp += l;
00544 } else {
00545
00546 io_filesfrom_f_out = -1;
00547 }
00548 }
00549 } else if (io_filesfrom_f_in >= 0) {
00550 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
00551 int l = read(io_filesfrom_f_in,
00552 io_filesfrom_buf,
00553 sizeof io_filesfrom_buf);
00554 if (l <= 0) {
00555
00556 io_filesfrom_buf[0] = '\0';
00557 io_filesfrom_buf[1] = '\0';
00558 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
00559 io_filesfrom_f_in = -1;
00560 } else {
00561 if (!eol_nulls) {
00562 char *s = io_filesfrom_buf + l;
00563
00564 while (s-- > io_filesfrom_buf) {
00565 if (*s == '\n' || *s == '\r')
00566 *s = '\0';
00567 }
00568 }
00569 if (!io_filesfrom_lastchar) {
00570
00571
00572 while (l && !*io_filesfrom_bp)
00573 io_filesfrom_bp++, l--;
00574 }
00575 if (!l)
00576 io_filesfrom_bp = io_filesfrom_buf;
00577 else {
00578 char *f = io_filesfrom_bp;
00579 char *t = f;
00580 char *eob = f + l;
00581
00582 while (f != eob) {
00583 if (!(*t++ = *f++)) {
00584 while (f != eob && !*f)
00585 f++, l--;
00586 }
00587 }
00588 io_filesfrom_lastchar = f[-1];
00589 }
00590 io_filesfrom_buflen = l;
00591 }
00592 }
00593 }
00594 }
00595
00596 if (!FD_ISSET(fd, &r_fds))
00597 continue;
00598
00599 n = read(fd, buf, len);
00600
00601 if (n <= 0) {
00602 if (n == 0)
00603 whine_about_eof(fd);
00604 if (errno == EINTR || errno == EWOULDBLOCK
00605 || errno == EAGAIN)
00606 continue;
00607
00608
00609 if (fd == sock_f_in) {
00610 close_multiplexing_out();
00611 rsyserr(FSOCKERR, errno, "read error");
00612 } else
00613 rsyserr(FERROR, errno, "read error");
00614 exit_cleanup(RERR_STREAMIO);
00615 }
00616
00617 buf += n;
00618 len -= n;
00619 cnt += n;
00620
00621 if (fd == sock_f_in && io_timeout)
00622 last_io_in = time(NULL);
00623 }
00624
00625 return cnt;
00626 }
00627
00628
00629
00630
00631
00632 int read_filesfrom_line(int fd, char *fname)
00633 {
00634 char ch, *s, *eob = fname + MAXPATHLEN - 1;
00635 int cnt;
00636 int reading_remotely = filesfrom_host != NULL;
00637 int nulls = eol_nulls || reading_remotely;
00638
00639 start:
00640 s = fname;
00641 while (1) {
00642 cnt = read(fd, &ch, 1);
00643 if (cnt < 0 && (errno == EWOULDBLOCK
00644 || errno == EINTR || errno == EAGAIN)) {
00645 struct timeval tv;
00646 fd_set fds;
00647 FD_ZERO(&fds);
00648 FD_SET(fd, &fds);
00649 tv.tv_sec = select_timeout;
00650 tv.tv_usec = 0;
00651 if (!select(fd+1, &fds, NULL, NULL, &tv))
00652 check_timeout();
00653 continue;
00654 }
00655 if (cnt != 1)
00656 break;
00657 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
00658
00659 if (!reading_remotely && s == fname)
00660 continue;
00661 break;
00662 }
00663 if (s < eob)
00664 *s++ = ch;
00665 }
00666 *s = '\0';
00667
00668
00669 if (*fname == '#' || *fname == ';')
00670 goto start;
00671
00672 return s - fname;
00673 }
00674
00675 static char *iobuf_out;
00676 static int iobuf_out_cnt;
00677
00678 void io_start_buffering_out(void)
00679 {
00680 if (iobuf_out)
00681 return;
00682 if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
00683 out_of_memory("io_start_buffering_out");
00684 iobuf_out_cnt = 0;
00685 }
00686
00687 static char *iobuf_in;
00688 static size_t iobuf_in_siz;
00689
00690 void io_start_buffering_in(void)
00691 {
00692 if (iobuf_in)
00693 return;
00694 iobuf_in_siz = 2 * IO_BUFFER_SIZE;
00695 if (!(iobuf_in = new_array(char, iobuf_in_siz)))
00696 out_of_memory("io_start_buffering_in");
00697 }
00698
00699 void io_end_buffering(void)
00700 {
00701 io_flush(NORMAL_FLUSH);
00702 if (!io_multiplexing_out) {
00703 free(iobuf_out);
00704 iobuf_out = NULL;
00705 }
00706 }
00707
00708 void maybe_flush_socket(void)
00709 {
00710 if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
00711 io_flush(NORMAL_FLUSH);
00712 }
00713
00714 void maybe_send_keepalive(void)
00715 {
00716 if (time(NULL) - last_io_out >= allowed_lull) {
00717 if (!iobuf_out || !iobuf_out_cnt) {
00718 if (protocol_version < 29)
00719 return;
00720 write_int(sock_f_out, the_file_list->count);
00721 write_shortint(sock_f_out, ITEM_IS_NEW);
00722 }
00723 if (iobuf_out)
00724 io_flush(NORMAL_FLUSH);
00725 }
00726 }
00727
00728
00729
00730
00731
00732 static void read_loop(int fd, char *buf, size_t len)
00733 {
00734 while (len) {
00735 int n = read_timeout(fd, buf, len);
00736
00737 buf += n;
00738 len -= n;
00739 }
00740 }
00741
00742
00743
00744
00745
00746
00747
00748 static int readfd_unbuffered(int fd, char *buf, size_t len)
00749 {
00750 static size_t remaining;
00751 static size_t iobuf_in_ndx;
00752 size_t msg_bytes;
00753 int tag, cnt = 0;
00754 char line[BIGPATHBUFLEN];
00755
00756 if (!iobuf_in || fd != sock_f_in)
00757 return read_timeout(fd, buf, len);
00758
00759 if (!io_multiplexing_in && remaining == 0) {
00760 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
00761 iobuf_in_ndx = 0;
00762 }
00763
00764 while (cnt == 0) {
00765 if (remaining) {
00766 len = MIN(len, remaining);
00767 memcpy(buf, iobuf_in + iobuf_in_ndx, len);
00768 iobuf_in_ndx += len;
00769 remaining -= len;
00770 cnt = len;
00771 break;
00772 }
00773
00774 read_loop(fd, line, 4);
00775 tag = IVAL(line, 0);
00776
00777 msg_bytes = tag & 0xFFFFFF;
00778 tag = (tag >> 24) - MPLEX_BASE;
00779
00780 switch (tag) {
00781 case MSG_DATA:
00782 if (msg_bytes > iobuf_in_siz) {
00783 if (!(iobuf_in = realloc_array(iobuf_in, char,
00784 msg_bytes)))
00785 out_of_memory("readfd_unbuffered");
00786 iobuf_in_siz = msg_bytes;
00787 }
00788 read_loop(fd, iobuf_in, msg_bytes);
00789 remaining = msg_bytes;
00790 iobuf_in_ndx = 0;
00791 break;
00792 case MSG_DELETED:
00793 if (msg_bytes >= sizeof line)
00794 goto overflow;
00795 read_loop(fd, line, msg_bytes);
00796
00797 if (msg_bytes > 0 && !line[msg_bytes-1])
00798 log_delete(line, S_IFDIR);
00799 else {
00800 line[msg_bytes] = '\0';
00801 log_delete(line, S_IFREG);
00802 }
00803 break;
00804 case MSG_SUCCESS:
00805 if (msg_bytes != 4) {
00806 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
00807 tag, (long)msg_bytes, who_am_i());
00808 exit_cleanup(RERR_STREAMIO);
00809 }
00810 read_loop(fd, line, msg_bytes);
00811 successful_send(IVAL(line, 0));
00812 break;
00813 case MSG_INFO:
00814 case MSG_ERROR:
00815 if (msg_bytes >= sizeof line) {
00816 overflow:
00817 rprintf(FERROR,
00818 "multiplexing overflow %d:%ld [%s]\n",
00819 tag, (long)msg_bytes, who_am_i());
00820 exit_cleanup(RERR_STREAMIO);
00821 }
00822 read_loop(fd, line, msg_bytes);
00823 rwrite((enum logcode)tag, line, msg_bytes);
00824 break;
00825 default:
00826 rprintf(FERROR, "unexpected tag %d [%s]\n",
00827 tag, who_am_i());
00828 exit_cleanup(RERR_STREAMIO);
00829 }
00830 }
00831
00832 if (remaining == 0)
00833 io_flush(NORMAL_FLUSH);
00834
00835 return cnt;
00836 }
00837
00838
00839
00840
00841
00842
00843 static void readfd(int fd, char *buffer, size_t N)
00844 {
00845 int cnt;
00846 size_t total = 0;
00847
00848 while (total < N) {
00849 cnt = readfd_unbuffered(fd, buffer + total, N-total);
00850 total += cnt;
00851 }
00852
00853 if (fd == write_batch_monitor_in) {
00854 if ((size_t)write(batch_fd, buffer, total) != total)
00855 exit_cleanup(RERR_FILEIO);
00856 }
00857
00858 if (fd == sock_f_in)
00859 stats.total_read += total;
00860 }
00861
00862 int read_shortint(int f)
00863 {
00864 uchar b[2];
00865 readfd(f, (char *)b, 2);
00866 return (b[1] << 8) + b[0];
00867 }
00868
00869 int32 read_int(int f)
00870 {
00871 char b[4];
00872 int32 num;
00873
00874 readfd(f,b,4);
00875 num = IVAL(b,0);
00876 if (num == (int32)0xffffffff)
00877 return -1;
00878 return num;
00879 }
00880
00881 int64 read_longint(int f)
00882 {
00883 int64 num;
00884 char b[8];
00885 num = read_int(f);
00886
00887 if ((int32)num != (int32)0xffffffff)
00888 return num;
00889
00890 #if SIZEOF_INT64 < 8
00891 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
00892 exit_cleanup(RERR_UNSUPPORTED);
00893 #else
00894 readfd(f,b,8);
00895 num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
00896 #endif
00897
00898 return num;
00899 }
00900
00901 void read_buf(int f,char *buf,size_t len)
00902 {
00903 readfd(f,buf,len);
00904 }
00905
00906 void read_sbuf(int f,char *buf,size_t len)
00907 {
00908 readfd(f, buf, len);
00909 buf[len] = '\0';
00910 }
00911
00912 uchar read_byte(int f)
00913 {
00914 uchar c;
00915 readfd(f, (char *)&c, 1);
00916 return c;
00917 }
00918
00919 int read_vstring(int f, char *buf, int bufsize)
00920 {
00921 int len = read_byte(f);
00922
00923 if (len & 0x80)
00924 len = (len & ~0x80) * 0x100 + read_byte(f);
00925
00926 if (len >= bufsize) {
00927 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
00928 len, bufsize - 1);
00929 return -1;
00930 }
00931
00932 if (len)
00933 readfd(f, buf, len);
00934 buf[len] = '\0';
00935 return len;
00936 }
00937
00938
00939
00940 void read_sum_head(int f, struct sum_struct *sum)
00941 {
00942 sum->count = read_int(f);
00943 if (sum->count < 0) {
00944 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
00945 (long)sum->count, who_am_i());
00946 exit_cleanup(RERR_PROTOCOL);
00947 }
00948 sum->blength = read_int(f);
00949 if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
00950 rprintf(FERROR, "Invalid block length %ld [%s]\n",
00951 (long)sum->blength, who_am_i());
00952 exit_cleanup(RERR_PROTOCOL);
00953 }
00954 sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
00955 if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
00956 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
00957 sum->s2length, who_am_i());
00958 exit_cleanup(RERR_PROTOCOL);
00959 }
00960 sum->remainder = read_int(f);
00961 if (sum->remainder < 0 || sum->remainder > sum->blength) {
00962 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
00963 (long)sum->remainder, who_am_i());
00964 exit_cleanup(RERR_PROTOCOL);
00965 }
00966 }
00967
00968
00969
00970
00971 void write_sum_head(int f, struct sum_struct *sum)
00972 {
00973 static struct sum_struct null_sum;
00974
00975 if (sum == NULL)
00976 sum = &null_sum;
00977
00978 write_int(f, sum->count);
00979 write_int(f, sum->blength);
00980 if (protocol_version >= 27)
00981 write_int(f, sum->s2length);
00982 write_int(f, sum->remainder);
00983 }
00984
00985
00986
00987
00988
00989
00990
00991
00992
00993
00994
00995
00996
00997
00998
00999
01000
01001 static void sleep_for_bwlimit(int bytes_written)
01002 {
01003 static struct timeval prior_tv;
01004 static long total_written = 0;
01005 struct timeval tv, start_tv;
01006 long elapsed_usec, sleep_usec;
01007
01008 #define ONE_SEC 1000000L
01009
01010 if (!bwlimit_writemax)
01011 return;
01012
01013 total_written += bytes_written;
01014
01015 gettimeofday(&start_tv, NULL);
01016 if (prior_tv.tv_sec) {
01017 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
01018 + (start_tv.tv_usec - prior_tv.tv_usec);
01019 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
01020 if (total_written < 0)
01021 total_written = 0;
01022 }
01023
01024 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
01025 if (sleep_usec < ONE_SEC / 10) {
01026 prior_tv = start_tv;
01027 return;
01028 }
01029
01030 tv.tv_sec = sleep_usec / ONE_SEC;
01031 tv.tv_usec = sleep_usec % ONE_SEC;
01032 select(0, NULL, NULL, NULL, &tv);
01033
01034 gettimeofday(&prior_tv, NULL);
01035 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
01036 + (prior_tv.tv_usec - start_tv.tv_usec);
01037 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
01038 }
01039
01040
01041
01042
01043
01044
01045
01046 static void writefd_unbuffered(int fd,char *buf,size_t len)
01047 {
01048 size_t n, total = 0;
01049 fd_set w_fds, r_fds;
01050 int maxfd, count, cnt, using_r_fds;
01051 int defer_save = defer_forwarding_messages;
01052 struct timeval tv;
01053
01054 no_flush++;
01055
01056 while (total < len) {
01057 FD_ZERO(&w_fds);
01058 FD_SET(fd,&w_fds);
01059 maxfd = fd;
01060
01061 if (msg_fd_in >= 0) {
01062 FD_ZERO(&r_fds);
01063 FD_SET(msg_fd_in,&r_fds);
01064 if (msg_fd_in > maxfd)
01065 maxfd = msg_fd_in;
01066 using_r_fds = 1;
01067 } else
01068 using_r_fds = 0;
01069
01070 tv.tv_sec = select_timeout;
01071 tv.tv_usec = 0;
01072
01073 errno = 0;
01074 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
01075 &w_fds, NULL, &tv);
01076
01077 if (count <= 0) {
01078 if (count < 0 && errno == EBADF)
01079 exit_cleanup(RERR_SOCKETIO);
01080 check_timeout();
01081 continue;
01082 }
01083
01084 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
01085 read_msg_fd();
01086
01087 if (!FD_ISSET(fd, &w_fds))
01088 continue;
01089
01090 n = len - total;
01091 if (bwlimit_writemax && n > bwlimit_writemax)
01092 n = bwlimit_writemax;
01093 cnt = write(fd, buf + total, n);
01094
01095 if (cnt <= 0) {
01096 if (cnt < 0) {
01097 if (errno == EINTR)
01098 continue;
01099 if (errno == EWOULDBLOCK || errno == EAGAIN) {
01100 msleep(1);
01101 continue;
01102 }
01103 }
01104
01105
01106 if (fd == sock_f_out)
01107 close_multiplexing_out();
01108 rsyserr(FERROR, errno,
01109 "writefd_unbuffered failed to write %ld bytes [%s]",
01110 (long)len, who_am_i());
01111
01112
01113 while (fd == sock_f_out && io_multiplexing_in) {
01114 set_io_timeout(30);
01115 ignore_timeout = 0;
01116 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
01117 sizeof io_filesfrom_buf);
01118 }
01119 exit_cleanup(RERR_STREAMIO);
01120 }
01121
01122 total += cnt;
01123 defer_forwarding_messages = 1;
01124
01125 if (fd == sock_f_out) {
01126 if (io_timeout || am_generator)
01127 last_io_out = time(NULL);
01128 sleep_for_bwlimit(cnt);
01129 }
01130 }
01131
01132 defer_forwarding_messages = defer_save;
01133 no_flush--;
01134 }
01135
01136 static void msg2sndr_flush(void)
01137 {
01138 if (defer_forwarding_messages)
01139 return;
01140
01141 while (msg2sndr.head && io_multiplexing_out) {
01142 struct msg_list_item *m = msg2sndr.head;
01143 if (!(msg2sndr.head = m->next))
01144 msg2sndr.tail = NULL;
01145 stats.total_written += m->len;
01146 defer_forwarding_messages = 1;
01147 writefd_unbuffered(sock_f_out, m->buf, m->len);
01148 defer_forwarding_messages = 0;
01149 free(m);
01150 }
01151 }
01152
01153
01154
01155
01156
01157 static void mplex_write(enum msgcode code, char *buf, size_t len)
01158 {
01159 char buffer[1024];
01160 size_t n = len;
01161
01162 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
01163
01164 if (n > sizeof buffer - 4)
01165 n = 0;
01166 else
01167 memcpy(buffer + 4, buf, n);
01168
01169 writefd_unbuffered(sock_f_out, buffer, n+4);
01170
01171 len -= n;
01172 buf += n;
01173
01174 if (len) {
01175 defer_forwarding_messages = 1;
01176 writefd_unbuffered(sock_f_out, buf, len);
01177 defer_forwarding_messages = 0;
01178 msg2sndr_flush();
01179 }
01180 }
01181
01182 void io_flush(int flush_it_all)
01183 {
01184 msg2genr_flush(flush_it_all);
01185 msg2sndr_flush();
01186
01187 if (!iobuf_out_cnt || no_flush)
01188 return;
01189
01190 if (io_multiplexing_out)
01191 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
01192 else
01193 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
01194 iobuf_out_cnt = 0;
01195 }
01196
01197 static void writefd(int fd,char *buf,size_t len)
01198 {
01199 if (fd == msg_fd_out) {
01200 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
01201 exit_cleanup(RERR_PROTOCOL);
01202 }
01203
01204 if (fd == sock_f_out)
01205 stats.total_written += len;
01206
01207 if (fd == write_batch_monitor_out) {
01208 if ((size_t)write(batch_fd, buf, len) != len)
01209 exit_cleanup(RERR_FILEIO);
01210 }
01211
01212 if (!iobuf_out || fd != sock_f_out) {
01213 writefd_unbuffered(fd, buf, len);
01214 return;
01215 }
01216
01217 while (len) {
01218 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
01219 if (n > 0) {
01220 memcpy(iobuf_out+iobuf_out_cnt, buf, n);
01221 buf += n;
01222 len -= n;
01223 iobuf_out_cnt += n;
01224 }
01225
01226 if (iobuf_out_cnt == IO_BUFFER_SIZE)
01227 io_flush(NORMAL_FLUSH);
01228 }
01229 }
01230
01231 void write_shortint(int f, int x)
01232 {
01233 uchar b[2];
01234 b[0] = x;
01235 b[1] = x >> 8;
01236 writefd(f, (char *)b, 2);
01237 }
01238
01239 void write_int(int f,int32 x)
01240 {
01241 char b[4];
01242 SIVAL(b,0,x);
01243 writefd(f,b,4);
01244 }
01245
01246
01247
01248
01249
01250 void write_longint(int f, int64 x)
01251 {
01252 char b[8];
01253
01254 if (x <= 0x7FFFFFFF) {
01255 write_int(f, (int)x);
01256 return;
01257 }
01258
01259 #if SIZEOF_INT64 < 8
01260 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
01261 exit_cleanup(RERR_UNSUPPORTED);
01262 #else
01263 write_int(f, (int32)0xFFFFFFFF);
01264 SIVAL(b,0,(x&0xFFFFFFFF));
01265 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
01266
01267 writefd(f,b,8);
01268 #endif
01269 }
01270
01271 void write_buf(int f,char *buf,size_t len)
01272 {
01273 writefd(f,buf,len);
01274 }
01275
01276
01277 void write_sbuf(int f, char *buf)
01278 {
01279 writefd(f, buf, strlen(buf));
01280 }
01281
01282 void write_byte(int f, uchar c)
01283 {
01284 writefd(f, (char *)&c, 1);
01285 }
01286
01287 void write_vstring(int f, char *str, int len)
01288 {
01289 uchar lenbuf[3], *lb = lenbuf;
01290
01291 if (len > 0x7F) {
01292 if (len > 0x7FFF) {
01293 rprintf(FERROR,
01294 "attempting to send over-long vstring (%d > %d)\n",
01295 len, 0x7FFF);
01296 exit_cleanup(RERR_PROTOCOL);
01297 }
01298 *lb++ = len / 0x100 + 0x80;
01299 }
01300 *lb = len;
01301
01302 writefd(f, (char*)lenbuf, lb - lenbuf + 1);
01303 if (len)
01304 writefd(f, str, len);
01305 }
01306
01307
01308
01309
01310
01311
01312
01313
01314 int read_line(int f, char *buf, size_t maxlen)
01315 {
01316 while (maxlen) {
01317 buf[0] = 0;
01318 read_buf(f, buf, 1);
01319 if (buf[0] == 0)
01320 return 0;
01321 if (buf[0] == '\n')
01322 break;
01323 if (buf[0] != '\r') {
01324 buf++;
01325 maxlen--;
01326 }
01327 }
01328 *buf = '\0';
01329 return maxlen > 0;
01330 }
01331
01332 void io_printf(int fd, const char *format, ...)
01333 {
01334 va_list ap;
01335 char buf[BIGPATHBUFLEN];
01336 int len;
01337
01338 va_start(ap, format);
01339 len = vsnprintf(buf, sizeof buf, format, ap);
01340 va_end(ap);
01341
01342 if (len < 0)
01343 exit_cleanup(RERR_STREAMIO);
01344
01345 if (len > (int)sizeof buf) {
01346 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
01347 exit_cleanup(RERR_STREAMIO);
01348 }
01349
01350 write_sbuf(fd, buf);
01351 }
01352
01353
01354 void io_start_multiplex_out(void)
01355 {
01356 io_flush(NORMAL_FLUSH);
01357 io_start_buffering_out();
01358 io_multiplexing_out = 1;
01359 }
01360
01361
01362 void io_start_multiplex_in(void)
01363 {
01364 io_flush(NORMAL_FLUSH);
01365 io_start_buffering_in();
01366 io_multiplexing_in = 1;
01367 }
01368
01369
01370 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
01371 {
01372 if (!io_multiplexing_out)
01373 return 0;
01374
01375 io_flush(NORMAL_FLUSH);
01376 stats.total_written += (len+4);
01377 mplex_write(code, buf, len);
01378 return 1;
01379 }
01380
01381 void close_multiplexing_in(void)
01382 {
01383 io_multiplexing_in = 0;
01384 }
01385
01386
01387 void close_multiplexing_out(void)
01388 {
01389 io_multiplexing_out = 0;
01390 }
01391
01392 void start_write_batch(int fd)
01393 {
01394 write_stream_flags(batch_fd);
01395
01396
01397
01398
01399
01400
01401 write_int(batch_fd, protocol_version);
01402 write_int(batch_fd, checksum_seed);
01403
01404 if (am_sender)
01405 write_batch_monitor_out = fd;
01406 else
01407 write_batch_monitor_in = fd;
01408 }
01409
01410 void stop_write_batch(void)
01411 {
01412 write_batch_monitor_out = -1;
01413 write_batch_monitor_in = -1;
01414 }