Re: [Rd] There was a problem by the use of snow.

From: Prof Brian Ripley <ripley_at_stats.ox.ac.uk>
Date: Mon, 25 Jun 2007 13:49:52 +0100 (BST)

We don't need to change as much as this. The OutBytes function is only called for small objects apart from CHARSXPs: the latter are limited to 2^31 - 1 bytes. So 'length' in a read/write will never exceed INT_MAX.

In the current usage, the in-memory code is only used to write to a RAWSXP which is also limited to 2^31 - 1 bytes, so we only need to deal with signed overflows if we do things like double the buffer size, and that's pointless as the result will overflow the RAWSXP.

On Mon, 25 Jun 2007, Ei-ji Nakama wrote:

> Thank you for advice.
>
> library(snow)
> cl <- makeSOCKcluster(c("localhost","localhost"))
> clusterCall(cl, function(x){Sys.sleep(10);1})
> clusterCall(cl, function(x){Sys.sleep(60);1}) # Timeout Code.
> stopCluster(cl)
>
> I worked in AIX and Linux.
>
> diff -ruN R-devel.orig/src/include/Rinternals.h
> R-devel/src/include/Rinternals.h
> --- R-devel.orig/src/include/Rinternals.h 2007-06-18
> 00:50:00.000000000 +0900
> +++ R-devel/src/include/Rinternals.h 2007-06-25 13:08:00.000000000 +0900
> @@ -725,7 +725,7 @@
> R_pstream_format_t type;
> int version;
> void (*OutChar)(R_outpstream_t, int);
> - void (*OutBytes)(R_outpstream_t, void *, int);
> + void (*OutBytes)(R_outpstream_t, void *, size_t);
> SEXP (*OutPersistHookFunc)(SEXP, SEXP);
> SEXP OutPersistHookData;
> };
> @@ -735,7 +735,7 @@
> R_pstream_data_t data;
> R_pstream_format_t type;
> int (*InChar)(R_inpstream_t);
> - void (*InBytes)(R_inpstream_t, void *, int);
> + void (*InBytes)(R_inpstream_t, void *, size_t);
> SEXP (*InPersistHookFunc)(SEXP, SEXP);
> SEXP InPersistHookData;
> };
> @@ -743,12 +743,12 @@
> void R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
> R_pstream_format_t type,
> int (*inchar)(R_inpstream_t),
> - void (*inbytes)(R_inpstream_t, void *, int),
> + void (*inbytes)(R_inpstream_t, void *, size_t),
> SEXP (*phook)(SEXP, SEXP), SEXP pdata);
> void R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
> R_pstream_format_t type, int version,
> void (*outchar)(R_outpstream_t, int),
> - void (*outbytes)(R_outpstream_t, void *, int),
> + void (*outbytes)(R_outpstream_t, void *, size_t),
> SEXP (*phook)(SEXP, SEXP), SEXP pdata);
>
> void R_InitFileInPStream(R_inpstream_t stream, FILE *fp,
> diff -ruN R-devel.orig/src/main/serialize.c R-devel/src/main/serialize.c
> --- R-devel.orig/src/main/serialize.c 2007-06-18 00:50:02.000000000 +0900
> +++ R-devel/src/main/serialize.c 2007-06-25 19:32:05.000000000 +0900
> @@ -1529,7 +1529,7 @@
> R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
> R_pstream_format_t type,
> int (*inchar)(R_inpstream_t),
> - void (*inbytes)(R_inpstream_t, void *, int),
> + void (*inbytes)(R_inpstream_t, void *, size_t),
> SEXP (*phook)(SEXP, SEXP), SEXP pdata)
> {
> stream->data = data;
> @@ -1544,7 +1544,7 @@
> R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
> R_pstream_format_t type, int version,
> void (*outchar)(R_outpstream_t, int),
> - void (*outbytes)(R_outpstream_t, void *, int),
> + void (*outbytes)(R_outpstream_t, void *, size_t),
> SEXP (*phook)(SEXP, SEXP), SEXP pdata)
> {
> stream->data = data;
> @@ -1574,14 +1574,14 @@
> return fgetc(fp);
> }
>
> -static void OutBytesFile(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesFile(R_outpstream_t stream, void *buf, size_t length)
> {
> FILE *fp = stream->data;
> size_t out = fwrite(buf, 1, length, fp);
> if (out != length) error(_("write failed"));
> }
>
> -static void InBytesFile(R_inpstream_t stream, void *buf, int length)
> +static void InBytesFile(R_inpstream_t stream, void *buf, size_t length)
> {
> FILE *fp = stream->data;
> size_t in = fread(buf, 1, length, fp);
> @@ -1629,12 +1629,12 @@
> error(_("cannot write to this connection"));
> }
>
> -static void InBytesConn(R_inpstream_t stream, void *buf, int length)
> +static void InBytesConn(R_inpstream_t stream, void *buf, size_t length)
> {
> Rconnection con = (Rconnection) stream->data;
> CheckInConn(con);
> if (con->text) {
> - int i;
> + size_t i;
> char *p = buf;
> for (i = 0; i < length; i++)
> p[i] = Rconn_fgetc(con);
> @@ -1659,7 +1659,7 @@
> }
> }
>
> -static void OutBytesConn(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesConn(R_outpstream_t stream, void *buf, size_t length)
> {
> Rconnection con = (Rconnection) stream->data;
> CheckOutConn(con);
> @@ -1817,7 +1817,7 @@
> bb->buf[bb->count++] = c;
> }
>
> -static void OutBytesBB(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesBB(R_outpstream_t stream, void *buf, size_t length)
> {
> bconbuf_t bb = stream->data;
> if (bb->count + length > BCONBUFSIZ)
> @@ -1863,14 +1863,14 @@
> */
>
> typedef struct membuf_st {
> - int size;
> - int count;
> + size_t size;
> + size_t count;
> unsigned char *buf;
> } *membuf_t;
>
> -static void resize_buffer(membuf_t mb, int needed)
> +static void resize_buffer(membuf_t mb, size_t needed)
> {
> - int newsize = 2 * needed;
> + size_t newsize = 2 * needed;
> mb->buf = realloc(mb->buf, newsize);
> if (mb->buf == NULL)
> error(_("cannot allocate buffer"));
> @@ -1885,7 +1885,7 @@
> mb->buf[mb->count++] = c;
> }
>
> -static void OutBytesMem(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesMem(R_outpstream_t stream, void *buf, size_t length)
> {
> membuf_t mb = stream->data;
> if (mb->count + length > mb->size)
> @@ -1902,7 +1902,7 @@
> return mb->buf[mb->count++];
> }
>
> -static void InBytesMem(R_inpstream_t stream, void *buf, int length)
> +static void InBytesMem(R_inpstream_t stream, void *buf, size_t length)
> {
> membuf_t mb = stream->data;
> if (mb->count + length > mb->size)
> @@ -1912,7 +1912,7 @@
> }
>
> static void InitMemInPStream(R_inpstream_t stream, membuf_t mb,
> - void *buf, int length,
> + void *buf, size_t length,
> SEXP (*phook)(SEXP, SEXP), SEXP pdata)
> {
> mb->count = 0;
> diff -ruN R-devel.orig/src/modules/internet/sockconn.c
> R-devel/src/modules/internet/sockconn.c
> --- R-devel.orig/src/modules/internet/sockconn.c 2007-06-03
> 00:50:32.000000000 +0900
> +++ R-devel/src/modules/internet/sockconn.c 2007-06-25
> 19:41:32.000000000 +0900
> @@ -155,14 +155,19 @@
> static size_t sock_read(void *ptr, size_t size, size_t nitems,
> Rconnection con)
> {
> + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> +
> + R_SockTimeout(timeout);
> return sock_read_helper(con, ptr, size * nitems)/size;
> }
>
> static size_t sock_write(const void *ptr, size_t size, size_t nitems,
> Rconnection con)
> {
> + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> Rsockconn this = (Rsockconn)con->private;
>
> + R_SockTimeout(timeout);
> return R_SockWrite(this->fd, ptr, size * nitems)/size;
> }
>
>
>
> 2007/6/25, Prof Brian Ripley <ripley_at_stats.ox.ac.uk>:
>> On Mon, 25 Jun 2007, Ei-ji Nakama wrote:
>>
>> > problem of the very large memory require by the Sign extension.
>> >
>> > --- R-2.5.0.orig/src/main/serialize.c 2007-03-27 01:42:08.000000000
>> +0900
>> > +++ R-2.5.0/src/main/serialize.c 2007-06-25 00:48:58.000000000
>> +0900
>> > @@ -1866,7 +1866,7 @@
>> >
>> > static void resize_buffer(membuf_t mb, int needed)
>> > {
>> > - int newsize = 2 * needed;
>> > + size_t newsize = 2 * needed;
>> > mb->buf = realloc(mb->buf, newsize);
>> > if (mb->buf == NULL)
>> > error(_("cannot allocate buffer"));
>>
>> Yes, thanks, but the structure also needs to be changed as the next line
>> is
>>
>> mb->size = newsize;
>>
>> and so this would set mb->size to a negative value.
>>
>> Could you please tell us where you encountered this?
>>
>> As far as I can see, this code is only used via R_serialize for
>> serializing to a raw vector, in which case 'size' can safely be 'int' and
>> the re-allocation should be up to 2^31-1 bytes at most (and allocating
>> twice what you are asked for seems undesirable). But there is potential
>> overflow at
>>
>> if (mb->count + length > mb->size)
>>
>> and possibly elsewhere.
>>
>>
>> > The time-out of read and write was not set.
>> >
>> > 51:sendData.SOCKnode <- function(node, data) {
>> > 52: timeout <- getClusterOption("timeout")
>> > 53: old <- options(timeout = timeout);
>> > 54: on.exit(options(old))
>> > 55: serialize(data, node$con)
>> > 56: }
>> > 57:
>> > 58:recvData.SOCKnode <- function(node) {
>> > 59: timeout <- getClusterOption("timeout")
>> > 60: old <- options(timeout = timeout);
>> > 61: on.exit(options(old))
>> > 62: unserialize(node$con)
>> > 63: }
>>
>> I don't think sock_read/sock_write is the right place to make that
>> setting. Ideally it would be set when the option is set, but as this is
>> in a module that needs an extension to the interface.
>>
>> Looking at the code, we read from a socket in blocks of 4096, but we write
>> in a single block. The latter is likely to be the problem, and I think
>> some redesigning is necessary here.
>>
>> Perhaps you and Luke Tierney can comment on exactly what the problem is
>> and how best to work around it.
>>
>> >
>> > --- R-2.5.0.orig/src/modules/internet/sockconn.c 2006-09-04
>> 23:20:59.000000000 +0900
>> > +++ R-2.5.0/src/modules/internet/sockconn.c 2007-06-25
>> > 00:51:38.000000000 +0900
>> > @@ -155,14 +155,19 @@
>> > static size_t sock_read(void *ptr, size_t size, size_t nitems,
>> > Rconnection con)
>> > {
>> > + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
>> > +
>> > + R_SockTimeout(timeout);
>> > return sock_read_helper(con, ptr, size * nitems)/size;
>> > }
>> >
>> > static size_t sock_write(const void *ptr, size_t size, size_t nitems,
>> > Rconnection con)
>> > {
>> > + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
>> > Rsockconn this = (Rsockconn)con->private;
>> >
>> > + R_SockTimeout(timeout);
>> > return R_SockWrite(this->fd, ptr, size * nitems)/size;
>> > }
>> >
>> >
>>
>> --
>> Brian D. Ripley, ripley_at_stats.ox.ac.uk
>> Professor of Applied Statistics, http://www.stats.ox.ac.uk/~ripley/
>> University of Oxford, Tel: +44 1865 272861 (self)
>> 1 South Parks Road, +44 1865 272866 (PA)
>> Oxford OX1 3TG, UK Fax: +44 1865 272595
>>
>>
>>
>
>
>

-- 
Brian D. Ripley,                  ripley_at_stats.ox.ac.uk
Professor of Applied Statistics,  http://www.stats.ox.ac.uk/~ripley/
University of Oxford,             Tel:  +44 1865 272861 (self)
1 South Parks Road,                     +44 1865 272866 (PA)
Oxford OX1 3TG, UK                Fax:  +44 1865 272595

______________________________________________
R-devel_at_r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel
Received on Mon 25 Jun 2007 - 13:03:03 GMT

Archive maintained by Robert King, hosted by the discipline of statistics at the University of Newcastle, Australia.
Archive generated by hypermail 2.2.0, at Tue 26 Jun 2007 - 07:35:37 GMT.

Mailing list information is available at https://stat.ethz.ch/mailman/listinfo/r-devel. Please read the posting guide before posting to the list.