[Rd] RFC: "loop connections"

From: <dhinds_at_sonic.net>
Date: Mon 22 Aug 2005 - 23:34:15 GMT


I've just implemented a generalization of R's text connections, to also support reading/writing raw binary data. There is very little new code to speak of. For input connections, I wrote code to populate the old text connection buffer from a raw vector, and provided a new raw_read() method. For output connections, I wrote a raw_write() to append to a raw vector. On input, the mode (text or binary) is determined by the data type of the input object; on output, I use the requested output mode (i.e. "w" / "wb"). For example:

 > con <- loopConnection("r", "wb")
 > a <- c(10,100,1000)
 > writeBin(a, con, size=4)
 > r

  [1] 00 00 20 41 00 00 c8 42 00 00 7a 44
 > close(con)
 > con <- loopConnection(r)
 > readBin(con, "double", n=3, size=4)

 [1] 10 100 1000
 > close(con)

I think "loop connection" is a better name for this sort of connection than "text connection" was even for the old version; that confuses the mode of the connection (text vs binary) with the mechanism (file, socket, etc).

I've appended a patch to the end of this message. As implemented here, textConnection is replaced by loopConnection but functionally this is a superset of the old textConnection. For compatibility, one could add:

  textConnection <- function(...) loopConnection(...)

The patch is against R-2.1.1. I can investigate whether any changes are required for the current development tree. I can also update the documentation files as required. I thought I'd first check whether anyone else thought this was worth inclusion before spending more time on it.

The raw_write() code could be improved with smarter memory allocation (grabbing bigger chunks rather than reallocating the raw vector for every write), but this is at least a proof of principle.

-/* ------------------- text connections --------------------- */
+/* ------------------- loop connections --------------------- */
 

 /* read a R character vector into a buffer */  static void text_init(Rconnection con, SEXP text)  {

     int i, nlines = length(text), nchars = 0;
-    Rtextconn this = (Rtextconn)con->private;
+    Rloopconn this = (Rloopconn)con->private;
 
     for(i = 0; i < nlines; i++)
 	nchars += strlen(CHAR(STRING_ELT(text, i))) + 1;

@@ -1668,19 +1668,35 @@
this->cur = this->save = 0;

 }  

-static Rboolean text_open(Rconnection con)

+/* read a R raw vector into a buffer */
+static void raw_init(Rconnection con, SEXP raw)
+{
+    int nbytes = length(raw);
+    Rloopconn this = (Rloopconn)con->private;
+
+    this->data = (char *) malloc(nbytes);
+    if(!this->data) {
+	free(this); free(con->description); free(con->class); free(con);
+	error(_("cannot allocate memory for raw connection"));
+    }
+    memcpy(this->data, RAW(raw), nbytes);
+    this->nchars = nbytes;
+    this->cur = this->save = 0;
+}
+
+static Rboolean loop_open(Rconnection con)
 {
     con->save = -1000;
     return TRUE;

 }  

-static void text_close(Rconnection con) +static void loop_close(Rconnection con)  {
 }  

-static void text_destroy(Rconnection con) +static void loop_destroy(Rconnection con)  {
- Rtextconn this = (Rtextconn)con->private; + Rloopconn this = (Rloopconn)con->private;  

     free(this->data);
     /* this->cur = this->nchars = 0; */

@@ -1689,7 +1705,7 @@
 

 static int text_fgetc(Rconnection con)
 {
- Rtextconn this = (Rtextconn)con->private; + Rloopconn this = (Rloopconn)con->private;

     if(this->save) {
 	int c;
 	c = this->save;

@@ -1700,48 +1716,69 @@
else return (int) (this->data[this->cur++]);
 }  

-static double text_seek(Rconnection con, double where, int origin, int rw) +static double loop_seek(Rconnection con, double where, int origin, int rw)  {
- if(where >= 0) error(_("seek is not relevant for text connection")); + if(where >= 0) error(_("seek is not relevant for loop connection"));

     return 0; /* if just asking, always at the beginning */  }  

-static Rconnection newtext(char *description, SEXP text)

+static size_t raw_read(void *ptr, size_t size, size_t nitems,
+		       Rconnection con)
+{
+    Rloopconn this = (Rloopconn)con->private;
+    if (this->cur + size*nitems > this->nchars) {
+	nitems = (this->nchars - this->cur)/size;
+	memcpy(ptr, this->data+this->cur, size*nitems);
+	this->cur = this->nchars;
+    } else {
+	memcpy(ptr, this->data+this->cur, size*nitems);
+	this->cur += size*nitems;
+    }
+    return nitems;
+}
+
+static Rconnection newloop(char *description, SEXP data)
 {
     Rconnection new;
     new = (Rconnection) malloc(sizeof(struct Rconn));
- if(!new) error(_("allocation of text connection failed")); - new->class = (char *) malloc(strlen("textConnection") + 1); + if(!new) error(_("allocation of loop connection failed")); + new->class = (char *) malloc(strlen("loopConnection") + 1);
     if(!new->class) {
 	free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }

- strcpy(new->class, "textConnection"); + strcpy(new->class, "loopConnection");
     new->description = (char *) malloc(strlen(description) + 1);
     if(!new->description) {
 	free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
     init_con(new, description, "r");
     new->isopen = TRUE;
     new->canwrite = FALSE;
-    new->open = &text_open;
-    new->close = &text_close;
-    new->destroy = &text_destroy;
-    new->fgetc = &text_fgetc;
-    new->seek = &text_seek;
-    new->private = (void*) malloc(sizeof(struct textconn));
+    new->open = &loop_open;
+    new->close = &loop_close;
+    new->destroy = &loop_destroy;

+ new->seek = &loop_seek;
+ new->private = (void*) malloc(sizeof(struct loopconn));
     if(!new->private) {
 	free(new->description); free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
+    }
+    new->text = isString(data);
+    if (new->text) {
+	new->fgetc = &text_fgetc;
+	text_init(new, data);
+    } else {
+	new->read = &raw_read;
+	raw_init(new, data);
     }
-    text_init(new, text);
     return new;

 }  

-static void outtext_close(Rconnection con) +static void outloop_close(Rconnection con)  {
- Routtextconn this = (Routtextconn)con->private; + Routloopconn this = (Routloopconn)con->private;

     SEXP tmp;
     int idx = ConnIndex(con);
 

@@ -1755,9 +1792,9 @@
SET_VECTOR_ELT(OutTextData, idx, R_NilValue);
 }  

-static void outtext_destroy(Rconnection con) +static void outloop_destroy(Rconnection con)  {
- Routtextconn this = (Routtextconn)con->private; + Routloopconn this = (Routloopconn)con->private;

     free(this->lastline); free(this);
 }  

@@ -1765,7 +1802,7 @@
 

 static int text_vfprintf(Rconnection con, const char *format, va_list ap)  {
- Routtextconn this = (Routtextconn)con->private; + Routloopconn this = (Routloopconn)con->private;

     char buf[BUFSIZE], *b = buf, *p, *q, *vmax = vmaxget();
     int res = 0, usedRalloc = FALSE, buffree,
 	already = strlen(this->lastline);

@@ -1830,24 +1867,41 @@
return res;

 }  

-static void outtext_init(Rconnection con, char *mode, int idx)

+static size_t raw_write(const void *ptr, size_t size, size_t nitems,
+			Rconnection con)
+{
+    Routloopconn this = (Routloopconn)con->private;
+    SEXP tmp;
+    int idx = ConnIndex(con);
+
+    PROTECT(tmp = lengthgets(this->data, this->len + size*nitems));
+    memcpy(RAW(tmp)+this->len, ptr, size*nitems);
+    this->len += size*nitems;
+    defineVar(this->namesymbol, tmp, VECTOR_ELT(OutTextData, idx));
+    this->data = tmp;
+    UNPROTECT(1);
+    return nitems;
+}
+
+static void outloop_init(Rconnection con, char *mode, int idx)
 {
- Routtextconn this = (Routtextconn)con->private; + Routloopconn this = (Routloopconn)con->private; + int st = (con->text ? STRSXP : RAWSXP);

     SEXP val;  

     this->namesymbol = install(con->description); - if(strcmp(mode, "w") == 0) {
+ if(strncmp(mode, "w", 1) == 0) {

 	/* create variable pointed to by con->description */
-	PROTECT(val = allocVector(STRSXP, 0));
+	PROTECT(val = allocVector(st, 0));
 	defineVar(this->namesymbol, val, VECTOR_ELT(OutTextData, idx));
 	UNPROTECT(1);
     } else {
 	/* take over existing variable */
 	val = findVar1(this->namesymbol, VECTOR_ELT(OutTextData, idx),
-		       STRSXP, FALSE);
+		       st, FALSE);
 	if(val == R_UnboundValue) {
-	    warning(_("text connection: appending to a non-existent char vector"));
-	    PROTECT(val = allocVector(STRSXP, 0));
+	    warning(_("loop connection: appending to a non-existent vector"));
+	    PROTECT(val = allocVector(st, 0));
 	    defineVar(this->namesymbol, val, VECTOR_ELT(OutTextData, idx));
 	    UNPROTECT(1);
 	}

@@ -1859,49 +1913,55 @@

 }    

-static Rconnection newouttext(char *description, SEXP sfile, char *mode, +static Rconnection newoutloop(char *description, SEXP sfile, char *mode,

                               int idx)
 {
+ int isText = (mode[1] != 'b');

     Rconnection new;
     void *tmp;
 
     new = (Rconnection) malloc(sizeof(struct Rconn));
- if(!new) error(_("allocation of text connection failed")); - new->class = (char *) malloc(strlen("textConnection") + 1); + if(!new) error(_("allocation of loop connection failed")); + new->class = (char *) malloc(strlen("loopConnection") + 1);
     if(!new->class) {
 	free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }

- strcpy(new->class, "textConnection"); + strcpy(new->class, "loopConnection");
     new->description = (char *) malloc(strlen(description) + 1);
     if(!new->description) {
 	free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
     init_con(new, description, mode);
+    new->text = isText;
     new->isopen = TRUE;
     new->canread = FALSE;
-    new->open = &text_open;
-    new->close = &outtext_close;
-    new->destroy = &outtext_destroy;
-    new->vfprintf = &text_vfprintf;
-    new->seek = &text_seek;
-    new->private = (void*) malloc(sizeof(struct outtextconn));
+    new->open = &loop_open;
+    new->close = &outloop_close;
+    new->destroy = &outloop_destroy;

+ new->seek = &loop_seek;
+ new->private = (void*) malloc(sizeof(struct outloopconn));
     if(!new->private) {
 	free(new->description); free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }

- ((Routtextconn)new->private)->lastline = tmp = malloc(LAST_LINE_LEN); + ((Routloopconn)new->private)->lastline = tmp = malloc(LAST_LINE_LEN);
     if(!tmp) {
 	free(new->private);
 	free(new->description); free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }

- outtext_init(new, mode, idx);
+    if (isText) {
+	new->vfprintf = &text_vfprintf;
+    } else {
+	new->write = &raw_write;
+    }
+    outloop_init(new, mode, idx);
     return new;

 }  

-SEXP do_textconnection(SEXP call, SEXP op, SEXP args, SEXP env) +SEXP do_loopconnection(SEXP call, SEXP op, SEXP args, SEXP env)  {

     SEXP sfile, stext, sopen, ans, class, venv;
     char *desc, *open;

@@ -1914,8 +1974,6 @@
error(_("invalid 'description' argument")); desc = CHAR(STRING_ELT(sfile, 0)); stext = CADR(args); - if(!isString(stext)) - error(_("invalid 'text' argument")); sopen = CADDR(args); if(!isString(sopen) || length(sopen) != 1) error(_("invalid 'open' argument"));
@@ -1924,16 +1982,20 @@
if (!isEnvironment(venv) && venv != R_NilValue) error(_("invalid 'environment' argument")); ncon = NextConnection(); - if(!strlen(open) || strncmp(open, "r", 1) == 0) - con = Connections[ncon] = newtext(desc, stext);
- else if (strncmp(open, "w", 1) == 0 || strncmp(open, "a", 1) == 0) {
+    if(!strlen(open) || strncmp(open, "r", 1) == 0) {
+	if(!isString(stext) && (TYPEOF(stext) != RAWSXP))
+	    error(_("invalid 'object' argument"));
+	con = Connections[ncon] = newloop(desc, stext);
+    } else if (strncmp(open, "w", 1) == 0 || strncmp(open, "a", 1) == 0) {
+	if(!isString(stext))
+	    error(_("invalid 'object' argument"));
 	if (OutTextData == NULL) {
 	    OutTextData = allocVector(VECSXP, NCONNECTIONS);
 	    R_PreserveObject(OutTextData);
 	}
 	SET_VECTOR_ELT(OutTextData, ncon, venv);
 	con = Connections[ncon] =
-	    newouttext(CHAR(STRING_ELT(stext, 0)), sfile, open, ncon);
+	    newoutloop(CHAR(STRING_ELT(stext, 0)), sfile, open, ncon);
     }
     else
 	errorcall(call, _("unsupported mode"));

@@ -1942,7 +2004,7 @@
PROTECT(ans = allocVector(INTSXP, 1)); INTEGER(ans)[0] = ncon; PROTECT(class = allocVector(STRSXP, 2)); - SET_STRING_ELT(class, 0, mkChar("textConnection")); + SET_STRING_ELT(class, 0, mkChar("loopConnection")); SET_STRING_ELT(class, 1, mkChar("connection")); classgets(ans, class); UNPROTECT(2); --- src/main/names.c.orig 2005-05-20 05:51:46.000000000 -0700 +++ src/main/names.c 2005-08-22 15:59:47.968828400 -0700
@@ -866,7 +866,7 @@
{"pushBack", do_pushback, 0, 11, 3, {PP_FUNCALL, PREC_FN, 0}}, {"clearPushBackLength",do_clearpushback,0, 11, 1, {PP_FUNCALL, PREC_FN, 0}}, {"pushBackLength",do_pushbacklength,0, 11, 1, {PP_FUNCALL, PREC_FN, 0}}, -{"textConnection",do_textconnection,0, 11, 4, {PP_FUNCALL, PREC_FN, 0}}, +{"loopConnection",do_loopconnection,0, 11, 4, {PP_FUNCALL, PREC_FN, 0}}, {"socketConnection",do_sockconn,0, 11, 6, {PP_FUNCALL, PREC_FN, 0}}, {"sockSelect",do_sockselect,0, 11, 3, {PP_FUNCALL, PREC_FN, 0}}, {"getAllConnections",do_getallconnections,0,11, 0, {PP_FUNCALL, PREC_FN, 0}}, --- src/include/Rconnections.h.orig 2005-04-18 04:34:02.000000000 -0700 +++ src/include/Rconnections.h 2005-08-22 15:40:02.582767400 -0700
@@ -82,19 +82,19 @@
int cp;

 } *Rgzfileconn;  

-typedef struct textconn {
+typedef struct loopconn {

     char *data;  /* all the data */
     int cur, nchars; /* current pos and number of chars */
     char save; /* pushback */

-} *Rtextconn;
+} *Rloopconn;  

-typedef struct outtextconn {
+typedef struct outloopconn {

     int len;  /* number of lines */
     SEXP namesymbol;
     SEXP data;
     char *lastline;
     int lastlinelength; /* buffer size */
-} *Routtextconn;
+} *Routloopconn;  

 typedef enum {HTTPsh, FTPsh} UrlScheme;  

-textConnection <- function(object, open = "r", local = FALSE) { +loopConnection <- function(object, open = "r", local = FALSE) {

     if (local) env <- parent.frame()
     else env <- .GlobalEnv

- .Internal(textConnection(deparse(substitute(object)), object, open, env)) + .Internal(loopConnection(deparse(substitute(object)), object, open, env))  }  

 seek <- function(con, ...)



R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel Received on Tue Aug 23 10:15:01 2005

This archive was generated by hypermail 2.1.8 : Mon 20 Feb 2006 - 03:21:18 GMT