Package logilab :: Package common :: Module shellutils
[frames] | no frames]

Source Code for Module logilab.common.shellutils

  1  # copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """shell/term utilities, useful to write some python scripts instead of shell 
 19  scripts. 
 20  """ 
 21  __docformat__ = "restructuredtext en" 
 22   
 23  import os 
 24  import glob 
 25  import shutil 
 26  import stat 
 27  import sys 
 28  import tempfile 
 29  import time 
 30  import fnmatch 
 31  import errno 
 32  from os.path import exists, isdir, islink, basename, join 
 33   
 34  from logilab.common import STD_BLACKLIST, _handle_blacklist 
 35  from logilab.common.compat import raw_input 
 36  from logilab.common.compat import str_to_bytes 
 37   
 38  try: 
 39      from logilab.common.proc import ProcInfo, NoSuchProcess 
 40  except ImportError: 
 41      # windows platform 
42 - class NoSuchProcess(Exception): pass
43
44 - def ProcInfo(pid):
45 raise NoSuchProcess()
46 47
48 -class tempdir(object):
49
50 - def __enter__(self):
51 self.path = tempfile.mkdtemp() 52 return self.path
53
54 - def __exit__(self, exctype, value, traceback):
55 # rmtree in all cases 56 shutil.rmtree(self.path) 57 return traceback is None
58 59
60 -class pushd(object):
61 - def __init__(self, directory):
62 self.directory = directory
63
64 - def __enter__(self):
65 self.cwd = os.getcwd() 66 os.chdir(self.directory) 67 return self.directory
68
69 - def __exit__(self, exctype, value, traceback):
70 os.chdir(self.cwd)
71 72
73 -def chown(path, login=None, group=None):
74 """Same as `os.chown` function but accepting user login or group name as 75 argument. If login or group is omitted, it's left unchanged. 76 77 Note: you must own the file to chown it (or be root). Otherwise OSError is raised. 78 """ 79 if login is None: 80 uid = -1 81 else: 82 try: 83 uid = int(login) 84 except ValueError: 85 import pwd # Platforms: Unix 86 uid = pwd.getpwnam(login).pw_uid 87 if group is None: 88 gid = -1 89 else: 90 try: 91 gid = int(group) 92 except ValueError: 93 import grp 94 gid = grp.getgrnam(group).gr_gid 95 os.chown(path, uid, gid)
96
97 -def mv(source, destination, _action=shutil.move):
98 """A shell-like mv, supporting wildcards. 99 """ 100 sources = glob.glob(source) 101 if len(sources) > 1: 102 assert isdir(destination) 103 for filename in sources: 104 _action(filename, join(destination, basename(filename))) 105 else: 106 try: 107 source = sources[0] 108 except IndexError: 109 raise OSError('No file matching %s' % source) 110 if isdir(destination) and exists(destination): 111 destination = join(destination, basename(source)) 112 try: 113 _action(source, destination) 114 except OSError, ex: 115 raise OSError('Unable to move %r to %r (%s)' % ( 116 source, destination, ex))
117
118 -def rm(*files):
119 """A shell-like rm, supporting wildcards. 120 """ 121 for wfile in files: 122 for filename in glob.glob(wfile): 123 if islink(filename): 124 os.remove(filename) 125 elif isdir(filename): 126 shutil.rmtree(filename) 127 else: 128 os.remove(filename)
129
130 -def cp(source, destination):
131 """A shell-like cp, supporting wildcards. 132 """ 133 mv(source, destination, _action=shutil.copy)
134
135 -def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
136 """Recursively find files ending with the given extensions from the directory. 137 138 :type directory: str 139 :param directory: 140 directory where the search should start 141 142 :type exts: basestring or list or tuple 143 :param exts: 144 extensions or lists or extensions to search 145 146 :type exclude: boolean 147 :param exts: 148 if this argument is True, returning files NOT ending with the given 149 extensions 150 151 :type blacklist: list or tuple 152 :param blacklist: 153 optional list of files or directory to ignore, default to the value of 154 `logilab.common.STD_BLACKLIST` 155 156 :rtype: list 157 :return: 158 the list of all matching files 159 """ 160 if isinstance(exts, basestring): 161 exts = (exts,) 162 if exclude: 163 def match(filename, exts): 164 for ext in exts: 165 if filename.endswith(ext): 166 return False 167 return True
168 else: 169 def match(filename, exts): 170 for ext in exts: 171 if filename.endswith(ext): 172 return True 173 return False 174 files = [] 175 for dirpath, dirnames, filenames in os.walk(directory): 176 _handle_blacklist(blacklist, dirnames, filenames) 177 # don't append files if the directory is blacklisted 178 dirname = basename(dirpath) 179 if dirname in blacklist: 180 continue 181 files.extend([join(dirpath, f) for f in filenames if match(f, exts)]) 182 return files 183 184
185 -def globfind(directory, pattern, blacklist=STD_BLACKLIST):
186 """Recursively finds files matching glob `pattern` under `directory`. 187 188 This is an alternative to `logilab.common.shellutils.find`. 189 190 :type directory: str 191 :param directory: 192 directory where the search should start 193 194 :type pattern: basestring 195 :param pattern: 196 the glob pattern (e.g *.py, foo*.py, etc.) 197 198 :type blacklist: list or tuple 199 :param blacklist: 200 optional list of files or directory to ignore, default to the value of 201 `logilab.common.STD_BLACKLIST` 202 203 :rtype: iterator 204 :return: 205 iterator over the list of all matching files 206 """ 207 for curdir, dirnames, filenames in os.walk(directory): 208 _handle_blacklist(blacklist, dirnames, filenames) 209 for fname in fnmatch.filter(filenames, pattern): 210 yield join(curdir, fname)
211
212 -def unzip(archive, destdir):
213 import zipfile 214 if not exists(destdir): 215 os.mkdir(destdir) 216 zfobj = zipfile.ZipFile(archive) 217 for name in zfobj.namelist(): 218 if name.endswith('/'): 219 os.mkdir(join(destdir, name)) 220 else: 221 outfile = open(join(destdir, name), 'wb') 222 outfile.write(zfobj.read(name)) 223 outfile.close()
224
225 -class Execute:
226 """This is a deadlock safe version of popen2 (no stdin), that returns 227 an object with errorlevel, out and err. 228 """ 229
230 - def __init__(self, command):
231 outfile = tempfile.mktemp() 232 errfile = tempfile.mktemp() 233 self.status = os.system("( %s ) >%s 2>%s" % 234 (command, outfile, errfile)) >> 8 235 self.out = open(outfile, "r").read() 236 self.err = open(errfile, "r").read() 237 os.remove(outfile) 238 os.remove(errfile)
239
240 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
241 """Acquire a lock represented by a file on the file system 242 243 If the process written in lock file doesn't exist anymore, we remove the 244 lock file immediately 245 If age of the lock_file is greater than max_delay, then we raise a UserWarning 246 """ 247 count = abs(max_try) 248 while count: 249 try: 250 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT) 251 os.write(fd, str_to_bytes(str(os.getpid())) ) 252 os.close(fd) 253 return True 254 except OSError, e: 255 if e.errno == errno.EEXIST: 256 try: 257 fd = open(lock_file, "r") 258 pid = int(fd.readline()) 259 pi = ProcInfo(pid) 260 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME]) 261 if age / max_delay > 1 : 262 raise UserWarning("Command '%s' (pid %s) has locked the " 263 "file '%s' for %s minutes" 264 % (pi.name(), pid, lock_file, age/60)) 265 except UserWarning: 266 raise 267 except NoSuchProcess: 268 os.remove(lock_file) 269 except Exception: 270 # The try block is not essential. can be skipped. 271 # Note: ProcInfo object is only available for linux 272 # process information are not accessible... 273 # or lock_file is no more present... 274 pass 275 else: 276 raise 277 count -= 1 278 time.sleep(delay) 279 else: 280 raise Exception('Unable to acquire %s' % lock_file)
281
282 -def release_lock(lock_file):
283 """Release a lock represented by a file on the file system.""" 284 os.remove(lock_file)
285 286
287 -class ProgressBar(object):
288 """A simple text progression bar.""" 289
290 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
291 if title: 292 self._fstr = '\r%s [%%-%ss]' % (title, int(size)) 293 else: 294 self._fstr = '\r[%%-%ss]' % int(size) 295 self._stream = stream 296 self._total = nbops 297 self._size = size 298 self._current = 0 299 self._progress = 0 300 self._current_text = None 301 self._last_text_write_size = 0
302
303 - def _get_text(self):
304 return self._current_text
305
306 - def _set_text(self, text=None):
307 if text != self._current_text: 308 self._current_text = text 309 self.refresh()
310
311 - def _del_text(self):
312 self.text = None
313 314 text = property(_get_text, _set_text, _del_text) 315
316 - def update(self):
317 """Update the progression bar.""" 318 self._current += 1 319 progress = int((float(self._current)/float(self._total))*self._size) 320 if progress > self._progress: 321 self._progress = progress 322 self.refresh()
323
324 - def refresh(self):
325 """Refresh the progression bar display.""" 326 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) ) 327 if self._last_text_write_size or self._current_text: 328 template = ' %%-%is' % (self._last_text_write_size) 329 text = self._current_text 330 if text is None: 331 text = '' 332 self._stream.write(template % text) 333 self._last_text_write_size = len(text.rstrip()) 334 self._stream.flush()
335
336 - def finish(self):
337 self._stream.write('\n') 338 self._stream.flush()
339 340
341 -class DummyProgressBar(object):
342 __slot__ = ('text',) 343
344 - def refresh(self):
345 pass
346 - def update(self):
347 pass
348 - def finish(self):
349 pass
350 351 352 _MARKER = object()
353 -class progress(object):
354
355 - def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True):
356 self.nbops = nbops 357 self.size = size 358 self.stream = stream 359 self.title = title 360 self.enabled = enabled
361
362 - def __enter__(self):
363 if self.enabled: 364 kwargs = {} 365 for attr in ('nbops', 'size', 'stream', 'title'): 366 value = getattr(self, attr) 367 if value is not _MARKER: 368 kwargs[attr] = value 369 self.pb = ProgressBar(**kwargs) 370 else: 371 self.pb = DummyProgressBar() 372 return self.pb
373
374 - def __exit__(self, exc_type, exc_val, exc_tb):
375 self.pb.finish()
376
377 -class RawInput(object):
378
379 - def __init__(self, input=None, printer=None):
380 self._input = input or raw_input 381 self._print = printer
382
383 - def ask(self, question, options, default):
384 assert default in options 385 choices = [] 386 for option in options: 387 if option == default: 388 label = option[0].upper() 389 else: 390 label = option[0].lower() 391 if len(option) > 1: 392 label += '(%s)' % option[1:].lower() 393 choices.append((option, label)) 394 prompt = "%s [%s]: " % (question, 395 '/'.join([opt[1] for opt in choices])) 396 tries = 3 397 while tries > 0: 398 answer = self._input(prompt).strip().lower() 399 if not answer: 400 return default 401 possible = [option for option, label in choices 402 if option.lower().startswith(answer)] 403 if len(possible) == 1: 404 return possible[0] 405 elif len(possible) == 0: 406 msg = '%s is not an option.' % answer 407 else: 408 msg = ('%s is an ambiguous answer, do you mean %s ?' % ( 409 answer, ' or '.join(possible))) 410 if self._print: 411 self._print(msg) 412 else: 413 print msg 414 tries -= 1 415 raise Exception('unable to get a sensible answer')
416
417 - def confirm(self, question, default_is_yes=True):
418 default = default_is_yes and 'y' or 'n' 419 answer = self.ask(question, ('y', 'n'), default) 420 return answer == 'y'
421 422 ASK = RawInput() 423 424
425 -def getlogin():
426 """avoid using os.getlogin() because of strange tty / stdin problems 427 (man 3 getlogin) 428 Another solution would be to use $LOGNAME, $USER or $USERNAME 429 """ 430 if sys.platform != 'win32': 431 import pwd # Platforms: Unix 432 return pwd.getpwuid(os.getuid())[0] 433 else: 434 return os.environ['USERNAME']
435