Main Page | Modules | Data Structures | Directories | File List | Data Fields | Globals

io.c

00001 /*
00002  * Copyright (c) 2005, 2006 by KoanLogic s.r.l. <http://www.koanlogic.com>
00003  * All rights reserved.
00004  *
00005  * This file is part of KLone, and as such it is subject to the license stated
00006  * in the LICENSE file which you have received as part of this distribution.
00007  *
00008  * $Id: io.c,v 1.38 2007/10/26 11:21:51 tho Exp $
00009  */
00010 
00011 #include "klone_conf.h"
00012 #include <unistd.h>
00013 #include <u/libu.h>
00014 #include <klone/io.h>
00015 #include <klone/io.h>
00016 #include <klone/ioprv.h>
00017 #include <klone/codec.h>
00018 
00019 enum io_type_e io_type(io_t *io);
00020 
00021 enum { 
00022     IO_RD_BUFSZ = 4096, 
00023     IO_WR_BUFSZ = 4096
00024 };
00025 
00026 #define IO_WBUF_AVAIL(io) (io->wbsz - io->wcount)
00027 #define IO_WBUF_FULL(io) (io->wbsz == io->wcount)
00028 
00029 static inline int io_transform_codec_buffer(io_t *, codec_t *, char *, 
00030         size_t *);
00031 
00043 enum io_type_e io_type(io_t *io)
00044 {
00045     return io->type;
00046 }
00047 
00061 ssize_t io_pipe(io_t *out, io_t *in)
00062 {
00063     enum { BUFSZ = 4096 };
00064     char buf[BUFSZ];
00065     size_t count = 0;   /* # of bytes piped */
00066     ssize_t c;
00067 
00068     dbg_err_if (out == NULL);
00069     dbg_err_if (in == NULL);
00070     
00071     for(;;)
00072     {
00073         c = io_read(in, buf, BUFSZ);
00074         dbg_err_if(c < 0);
00075         if(c == 0)
00076             break; /* eof */
00077 
00078         /* write it out */
00079         dbg_err_if(io_write(out, buf, c) < 0);
00080 
00081         count += c;
00082     }
00083 
00084     return count;
00085 err:
00086     return -1;
00087 }
00088 
00106 int io_dup(io_t *io, io_t **pio)
00107 {
00108     dbg_return_if (io == NULL, ~0);
00109     dbg_return_if (pio == NULL, ~0);
00110     
00111     io->refcnt++;
00112 
00113     *pio = io;
00114 
00115     return 0;
00116 }
00117 
00132 ssize_t io_copy(io_t *out, io_t *in, size_t size)
00133 {
00134     enum { BUFSZ = 4096 };
00135     char buf[BUFSZ];
00136     size_t rem = size;
00137     ssize_t c;
00138 
00139     dbg_err_if (in == NULL);
00140     dbg_err_if (out == NULL);
00141 
00142     while(rem)
00143     {
00144         c = io_read(in, buf, MIN(BUFSZ, rem));
00145         dbg_err_if(c < 0);
00146         if(c == 0)
00147             break; /* eof */
00148 
00149         /* write it out */
00150         dbg_err_if(io_write(out, buf, c) < 0);
00151 
00152         rem -= c;
00153     }
00154 
00155     return size - rem;
00156 err:
00157     return -1;
00158 }
00159 
00160 
00175 ssize_t io_seek(io_t *io, size_t off)
00176 {
00177     dbg_err_if (io == NULL);
00178     dbg_err_if (io_flush(io));
00179 
00180     if(io->seek)
00181         return io->seek(io, off);
00182 err:
00183     return -1;
00184 }
00185 
00197 ssize_t io_tell(io_t *io)
00198 {
00199     dbg_err_if (io == NULL);
00200     dbg_err_if (io_flush(io));
00201 
00202     if(io->tell)
00203         return io->tell(io);
00204 err:
00205     return -1;
00206 }
00207 
00208 /* alloc the read buffer */
00209 static int io_rbuf_alloc(io_t *io)
00210 {
00211     dbg_err_if (io == NULL);
00212     
00213     io->rbsz = IO_RD_BUFSZ; /* set the buffer size */
00214 
00215     /* if a dup'd io_t already alloc'd a buffer then use it */
00216     io->rbuf = (char*)u_malloc(io->rbsz);
00217     dbg_err_if(io->rbuf == NULL);
00218     io->roff = 0;
00219 
00220     io->ubuf = (char*)u_malloc(io->rbsz);
00221     dbg_err_if(io->ubuf == NULL);
00222 
00223     return 0;
00224 err:
00225     return ~0;
00226 }
00227 
00228 /* alloc the write buffer */
00229 static int io_wbuf_alloc(io_t *io)
00230 {
00231     dbg_err_if (io == NULL);
00232 
00233     io->wbsz = IO_WR_BUFSZ; /* set the buffer size */
00234 
00235     /* if a dup'd io_t already alloc'd a buffer then use it */
00236     io->wbuf = (char*)u_malloc(io->wbsz);
00237     dbg_err_if(io->wbuf == NULL);
00238 
00239     return 0;
00240 err:
00241     return ~0;
00242 }
00243 
00244 static ssize_t io_transform(io_t *io, codec_t *codec, 
00245     char *dst, size_t *dcount, const char *src, size_t sz)
00246 {
00247     ssize_t c;
00248     size_t cavail, sdcount = *dcount; /* saved dcount */
00249 
00250     dbg_err_if (io == NULL);
00251     dbg_err_if (codec == NULL);
00252     dbg_err_if (src == NULL);
00253     dbg_err_if (dst == NULL);
00254     dbg_err_if (dcount == NULL);
00255 
00256     if(codec == TAILQ_LAST(&io->codec_chain, codec_chain_s))
00257     {
00258         /* last codec in the chain */
00259         c = codec->transform(codec, dst, dcount, src, sz); 
00260         dbg_err_if(c == 0 && *dcount == 0);
00261         return c;
00262     } else {
00263         c = 0;
00264         do
00265         {
00266             *dcount = sdcount;
00267             if(codec->ccount)
00268                 dbg_err_if(io_transform_codec_buffer(io, codec, dst, dcount));
00269             else
00270                 *dcount = 0; /* no bytes written to 'dst' */
00271 
00272             c = 0; /* zero byte of 'src' consumed */
00273             cavail = CODEC_BUFSZ - codec->ccount - codec->coff;
00274             if(sz && cavail)
00275             {
00276                 dbg_err_if((c = codec->transform(codec, 
00277                     codec->cbuf + codec->coff + codec->ccount, 
00278                     &cavail, src, sz)) < 0);
00279 
00280                 codec->ccount += cavail;
00281                 dbg_err_if(c == 0 && cavail == 0);
00282             }
00283         } while(c == 0 && *dcount == 0 && (codec->ccount || sz));
00284 
00285         return c;
00286     }
00287 
00288 err:
00289     return -1;
00290 }
00291 
00292 static inline int io_transform_codec_buffer(io_t *io, codec_t *codec, 
00293     char *dst, size_t *dcount)
00294 {
00295     ssize_t ct;
00296 
00297     dbg_err_if (io == NULL);
00298     dbg_err_if (codec == NULL);
00299     dbg_err_if (dst == NULL);
00300     dbg_err_if (dcount == NULL);
00301 
00302     if(codec->ccount)
00303     {
00304         dbg_err_if((ct = io_transform(io, TAILQ_NEXT(codec, np), 
00305             dst, dcount, codec->cbuf + codec->coff,codec->ccount)) < 0);
00306 
00307         codec->ccount -= ct;
00308 
00309         if(codec->ccount > 0)
00310             codec->coff += ct;
00311         else
00312             codec->coff = 0;
00313     }
00314 
00315     return 0;
00316 err:
00317     return ~0;
00318 }
00319 
00320 static inline ssize_t io_transfer(io_t *io, char *dst, size_t *dcount, 
00321         const char *src, size_t sz)
00322 {
00323     dbg_err_if (io == NULL);
00324     dbg_err_if (src == NULL);
00325     dbg_err_if (dst == NULL);
00326     dbg_err_if (dcount == NULL);
00327 
00328     if(!TAILQ_EMPTY(&io->codec_chain))
00329     {
00330         return io_transform(io, TAILQ_FIRST(&io->codec_chain), 
00331             dst, dcount, src, sz); 
00332     } else {
00333         ssize_t wr = MIN(sz, *dcount); 
00334         memcpy(dst, src, wr);
00335         *dcount = wr;
00336         return wr;
00337     }
00338 err:
00339     return -1;
00340 }
00341 
00342 static int io_chain_flush_chunk(io_t *io, char *dst, size_t *dcount)
00343 {
00344     int er;
00345     codec_t *codec;
00346     size_t sz, sdcount;
00347 
00348     dbg_err_if (io == NULL);
00349     dbg_err_if (dst == NULL);
00350     dbg_err_if (dcount == NULL);
00351 
00352     sdcount = *dcount;
00353 
00354     TAILQ_FOREACH(codec, &io->codec_chain, np)
00355     {
00356         *dcount = sdcount;
00357         if(codec == TAILQ_LAST(&io->codec_chain, codec_chain_s))
00358         {
00359             return codec->flush(codec, dst, dcount);
00360         } else {
00361             for(;;)
00362             {
00363                 *dcount = sdcount;
00364                 if(codec->ccount)
00365                 {
00366                     int rc = io_transform_codec_buffer(io, codec, dst, dcount);
00367                     dbg_err_if (rc);
00368 
00369                     if(*dcount)
00370                         return CODEC_FLUSH_CHUNK; /* call flush again */
00371                 } else
00372                     *dcount = 0; /* no bytes written to 'dst' */
00373 
00374                 sz = CODEC_BUFSZ - codec->ccount - codec->coff;
00375                 dbg_err_if((er = codec->flush(codec, 
00376                     codec->cbuf + codec->coff + codec->ccount, &sz)) < 0);
00377 
00378                 codec->ccount += sz;
00379 
00380                 if(er == CODEC_FLUSH_COMPLETE && codec->ccount == 0)
00381                     break; /* flush of this codec completed */
00382             } /* for */
00383         }
00384     }
00385 
00386     return CODEC_FLUSH_COMPLETE;
00387 err:
00388     return -1;
00389 }
00390 
00391 static int io_chain_flush(io_t *io)
00392 {
00393     enum { BUFSZ = 4096 };
00394     int er;
00395     size_t sz, count;
00396     char buf[BUFSZ], *ptr;
00397 
00398     dbg_err_if (io == NULL);
00399 
00400     for(;;)
00401     {
00402         /* note: we cannot write straight to the wbuf otherwise codec 
00403          * transform & flush functions cannot call io_write safely; so we use 
00404          * a temp buffer and memcpy it to the wbuf after flushing */
00405         count = BUFSZ;
00406         if((er = io_chain_flush_chunk(io, buf, &count)) == CODEC_FLUSH_COMPLETE)
00407             break; /* flush complete */
00408         
00409         dbg_err_if(er < 0);
00410 
00411         for(ptr = buf; count; )
00412         {
00413             if(IO_WBUF_FULL(io))
00414                 dbg_err_if(io_flush(io));
00415 
00416             sz = MIN(count, IO_WBUF_AVAIL(io));
00417             memcpy(io->wbuf + io->wcount, ptr, sz);
00418 
00419             count -= sz;
00420             io->wcount += sz;
00421             ptr += sz;
00422         }
00423     } 
00424 
00425     return 0;
00426 err:
00427     return ~0;
00428 }
00429 
00442 int io_close(io_t *io)
00443 {
00444     dbg_err_if (io == NULL);
00445 
00446     if(io->close)
00447         dbg_err_if(io->close(io));
00448 
00449     return 0;
00450 err:
00451     return ~0;
00452 }
00453 
00469 int io_free(io_t *io)
00470 {
00471     dbg_err_if (io == NULL);
00472     dbg_err_if (io->refcnt == 0);
00473 
00474     /* skip if this io_t has been dup'd and there are still one or more 
00475        references in use */
00476     if(--io->refcnt)
00477         return 0;
00478 
00479     /* flush, remove and free all codecs */
00480     dbg_if(io_codecs_remove(io));
00481 
00482     dbg_if(io_flush(io));
00483 
00484     /* flush and close the stream (but don't free it) */
00485     dbg_if(io_close(io));
00486 
00487     /* free per-type alloc'ed data */
00488     if(io->free)
00489         dbg_if(io->free(io));
00490 
00491     U_FREE(io->rbuf);
00492     U_FREE(io->ubuf);
00493     U_FREE(io->wbuf);
00494     U_FREE(io->name);
00495     U_FREE(io);
00496 
00497     return 0;
00498 err:
00499     return ~0;
00500 }
00501 
00502 /* refill the read buffer */
00503 static ssize_t io_underflow(io_t *io)
00504 {
00505     ssize_t c;
00506     size_t sz;
00507 
00508     dbg_err_if (io == NULL);
00509 
00510     if(io->rbuf == NULL)
00511         dbg_err_if(io_rbuf_alloc(io)); /* alloc the read buffer */
00512 
00513     while(io->rcount == 0)
00514     {
00515         if(io->ucount == 0)
00516         {   /* fetch some bytes from the device and fill the rbuffer */
00517             dbg_err_if((c = io->read(io, io->ubuf, io->rbsz)) < 0);
00518             if(c == 0)
00519             {   /* eof, try to get some buffered (already transformed) bytes 
00520                    from the codec */
00521                 if(!TAILQ_EMPTY(&io->codec_chain))
00522                 {
00523                     sz = io->rbsz;
00524                     c = io_chain_flush_chunk(io, io->rbuf, &sz);
00525                     dbg_err_if(c < 0);
00526                     io->rcount += sz;
00527                     io->roff = 0;
00528                     if(c == 0)
00529                         io->eof++;
00530                 }
00531                 break;
00532             }
00533             io->ucount += c;
00534         }
00535 
00536         /* transform all data in the buffer */
00537         sz = io->rbsz - io->rcount;
00538         dbg_err_if((c = io_transfer(io, io->rbuf, &sz, 
00539             io->ubuf + io->uoff, io->ucount)) < 0);
00540         dbg_err_if(c < 0);
00541         dbg_err_if(c == 0 && sz == 0);
00542         io->ucount -= c;
00543         if(io->ucount == 0)
00544             io->uoff = 0;
00545         else
00546             io->uoff += c;
00547 
00548         io->rcount = sz;
00549         io->roff = 0;
00550     }
00551 
00552     return io->rcount;
00553 err:
00554     return -1;
00555 }
00556 
00572 ssize_t io_read(io_t *io, char *buf, size_t size)
00573 {
00574     char *out = buf;
00575     size_t wr;
00576     ssize_t c;
00577 
00578     dbg_err_if (io == NULL);
00579     dbg_err_if (buf == NULL);
00580     
00581     if(io->eof)
00582         return 0;
00583 
00584     while(size)
00585     {
00586         if(io->rcount == 0)
00587         {
00588             dbg_err_if((c = io_underflow(io)) < 0);
00589             if(c == 0)
00590                 break;
00591         }
00592         /* copy out data */
00593         wr = MIN(io->rcount, size); 
00594         memcpy(out, io->rbuf + io->roff, wr);
00595         io->rcount -= wr;
00596         io->roff += wr;
00597         out += wr;
00598         size -= wr;
00599     }
00600 
00601     return out - buf;
00602 err:
00603     return -1;
00604 }
00605 
00620 ssize_t io_printf(io_t *io, const char *fmt, ...)
00621 {
00622     enum { BUFSZ = 2048 };
00623     char buf[BUFSZ], *bbuf = NULL; 
00624     va_list ap, ap2;
00625     int sz;
00626 
00627     dbg_err_if (io == NULL);
00628     dbg_err_if (fmt == NULL);
00629 
00630     /* build the message to print */
00631     va_start(ap, fmt); /* init variable list arguments */
00632 
00633     sz = vsnprintf(buf, BUFSZ, fmt, ap);
00634 
00635     va_end(ap);
00636 
00637     if(sz >= BUFSZ)
00638     { /* stack buffer too small, alloc a bigger one on the heap */
00639         va_start(ap2, fmt); /* don't know if ap can be reused... */
00640 
00641         bbuf = (char*)u_malloc(++sz);
00642         dbg_err_if(bbuf == NULL);
00643 
00644         if((sz = vsnprintf(bbuf, sz, fmt, ap2)) > 0)
00645         {
00646             dbg_err_if(io_write(io, bbuf, sz) < 0);
00647         }
00648 
00649         va_end(ap2);
00650 
00651         U_FREE(bbuf);
00652     } else if(sz > 0) {
00653         dbg_err_if(io_write(io, buf, sz) < 0);
00654     }
00655 
00656     return 0;
00657 err:
00658     return -1;
00659 }
00660 
00671 ssize_t io_flush(io_t *io)
00672 {
00673     ssize_t c;
00674     size_t off = 0;
00675 
00676     dbg_err_if (io == NULL);
00677  
00678     while(io->wcount)
00679     {
00680         c = io->write(io, io->wbuf + off, io->wcount);
00681         dbg_err_if(c < 0);
00682         if(c == 0)
00683             break;
00684     
00685         io->wcount -= c;
00686         off += c;
00687     }
00688 
00689     return 0;
00690 err:
00691     return -1;
00692 }
00693 
00706 ssize_t io_write(io_t *io, const char *buf, size_t size)
00707 {
00708     size_t sz = 0, rem = size;
00709     ssize_t c = 0;
00710 
00711     dbg_err_if (io == NULL);
00712     dbg_err_if (buf == NULL);
00713     
00714     if(io->wbuf == NULL)
00715         dbg_err_if(io_wbuf_alloc(io)); /* alloc the write buffer */
00716 
00717     while(rem)
00718     {
00719         if(IO_WBUF_FULL(io)) /* if there's no more free space */
00720             dbg_err_if(io_flush(io));
00721 
00722         sz = IO_WBUF_AVAIL(io);
00723         c = io_transfer(io, io->wbuf + io->wcount, &sz, buf, rem);
00724         dbg_err_if(c < 0);
00725         dbg_err_if(c == 0 && sz == 0); /* some bytes MUST be read or written */
00726 
00727         io->wcount += sz;
00728         buf += c;
00729         rem -= c;
00730     }
00731 
00732     return size;
00733 err:
00734     return -1;
00735 }
00736 
00748 inline ssize_t io_putc(io_t *io, char c)
00749 {
00750     return io_write(io, &c, 1);
00751 }
00752 
00764 inline ssize_t io_getc(io_t *io, char *pc)
00765 {
00766     return io_read(io, pc, 1);
00767 }
00768 
00769 static inline char *io_strnchr(char *buf, size_t sz, char c)
00770 {
00771     register char *p;
00772 
00773     dbg_goto_if (buf == NULL, end);
00774 
00775     p = buf;
00776 
00777     while(sz--)
00778     {
00779         if(*p == c)
00780             return p;
00781         ++p;
00782     }
00783 end:
00784     return NULL;
00785 }
00786 
00802 ssize_t io_get_until(io_t *io, char stop_at, char *buf, size_t size)
00803 {
00804     ssize_t wr, c, len = 0;
00805     char *p;
00806 
00807     dbg_err_if (io == NULL);
00808     dbg_err_if (buf == NULL);
00809 
00810     if(size < 2)
00811         return -1; /* buf too small */
00812 
00813     --size; /* save a char for \0 */
00814 
00815     if(io->rcount == 0)
00816         dbg_err_if(io_underflow(io) < 0);
00817 
00818     if(io->rcount == 0)
00819         return 0;
00820 
00821     for(;;)
00822     {
00823         if((p = io_strnchr(io->rbuf + io->roff, io->rcount, stop_at)) != NULL)
00824         {
00825             p++; /* jump over 'stop_at' char*/
00826             wr = MIN(p - (io->rbuf + io->roff), size);
00827             memcpy(buf, io->rbuf + io->roff, wr);
00828             buf[wr] = 0;
00829             io->rcount -= wr;
00830             io->roff += wr;
00831             len += wr;
00832             break;
00833         } else {
00834             if(size >= io->rcount)
00835             {
00836                 memcpy(buf, io->rbuf + io->roff, io->rcount);
00837                 len += io->rcount;
00838                 buf += io->rcount;
00839                 size -= io->rcount;
00840                 io->rcount = 0;
00841                 io->roff = 0;
00842                 dbg_err_if((c = io_underflow(io)) < 0);
00843                 if(c == 0)
00844                     break;
00845             } else {
00846                 /* buffer too small, return a partial line */
00847                 memcpy(buf, io->rbuf + io->roff, size);
00848                 len += size;
00849                 io->rcount -= size;
00850                 io->roff += size;
00851                 break;
00852             }
00853         }
00854     }
00855 
00856     buf[len] = 0;
00857     return len; /* return the # of chars in the line (strlen(line)) */
00858 err:
00859     return -1;
00860 }
00861 
00875 ssize_t io_gets(io_t *io, char *buf, size_t size)
00876 {
00877     return io_get_until(io, '\n', buf, size);
00878 }
00879 
00889 int io_codec_add_head(io_t *io, codec_t* c)
00890 {
00891     dbg_return_if (io == NULL, ~0);
00892     dbg_return_if (c == NULL, ~0);
00893 
00894     /* insert the codec at the head of the chain */
00895     TAILQ_INSERT_HEAD(&io->codec_chain, c, np);
00896 
00897     return 0;
00898 }
00899 
00909 int io_codec_add_tail(io_t *io, codec_t* c)
00910 {
00911     dbg_return_if (io == NULL, ~0);
00912     dbg_return_if (c == NULL, ~0);
00913 
00914     /* insert the codec at the end of the chain */
00915     TAILQ_INSERT_TAIL(&io->codec_chain, c, np);
00916 
00917     return 0;
00918 }
00919 
00928 int io_codecs_remove(io_t *io)
00929 {
00930     codec_t *codec;
00931     int rc = 0;
00932 
00933     dbg_return_if (io == NULL, ~0);
00934 
00935     if(!TAILQ_EMPTY(&io->codec_chain))
00936     {
00937         if(io->wbuf)
00938             dbg_ifb(io_chain_flush(io))
00939                 rc++;
00940 
00941         while((codec = TAILQ_FIRST(&io->codec_chain)) != NULL)
00942         {
00943             TAILQ_REMOVE(&io->codec_chain, codec, np);
00944             codec_free(codec);
00945         }
00946     }
00947 
00948     return rc;
00949 }
00950 
00963 int io_name_set(io_t *io, const char *name)
00964 {
00965     char *n;
00966 
00967     dbg_err_if (io == NULL);
00968     dbg_err_if (name == NULL);
00969     
00970     n = u_strdup(name);
00971     dbg_err_if(n == NULL);
00972 
00973     U_FREE(io->name);
00974 
00975     io->name = n;
00976 
00977     return 0;
00978 err:
00979     return ~0;
00980 }
00981 
00994 int io_name_get(io_t *io, char *name, size_t sz)
00995 {
00996     size_t min = 0;
00997 
00998     dbg_err_if (io == NULL);
00999     dbg_err_if (io->name == NULL);
01000     dbg_err_if (name == NULL);
01001     dbg_err_if (sz < 2);
01002 
01003     min = MIN(sz-1, strlen(io->name));
01004 
01005     memcpy(name, io->name, min);
01006     name[min] = 0;
01007     
01008     return 0;
01009 err:
01010     return ~0;
01011 }
01012 
01013 /*
01014  * \ingroup basic
01015  * \brief   Return the secure state of the IO object
01016  *  
01017  *  Return 0 if the connection is not secure (i.e. not encrypted) or not zero
01018  *  otherwise
01019  *
01020  * \param io    io object
01021  *
01022  * \return \c 0 for not secure connections, non-zero otherwise
01023  */
01024 int io_is_secure(io_t *io)
01025 {
01026     dbg_err_if(io == NULL);
01027 
01028     return io->is_secure;
01029 err:
01030     return 0;
01031 }
01032 
01033 
01034 /* used by io devices init functions: alloc (used dev_sz block size) 
01035    and initialize an io_t */
01036 int io_prv_create(size_t dev_sz, io_t **pio)
01037 {
01038     io_t *io = NULL;
01039 
01040     dbg_err_if (pio == NULL);
01041 
01042     io = u_zalloc(dev_sz);
01043     dbg_err_if(io == NULL);
01044 
01045     TAILQ_INIT(&io->codec_chain);
01046 
01047     /* set refcnt to 1 */
01048     io->refcnt++;
01049 
01050     /* size of the io device struct (that's also a 'castable' io_t) */
01051     io->size = dev_sz;
01052 
01053     *pio = io;
01054 
01055     return 0;
01056 err:
01057     U_FREE(io);
01058     return -1;
01059 }