Python Fetion 
Python November 5th, 2008
项目地址在: http://git.lazytech.info/?p=python-fetion.git
首先感谢 nathan’s space 的飞信协议分析以及 open fetion 和 fetion protocol plugin for pidgin 这两个项目的代码对我的启发
介绍:
fetion.py 是飞信 HTTP 的实现, 可以获得好友列表和发送短信
twisted-fetion 下的 fetionclient.py 是飞信的 TCP socket 的实现, 可以用来收发短信, 支持外部程序通过 HTTP POST 的方式来发送短信
fetion.py 的使用:
1 | python fetion.py -m 159xxxxxxxx -t "sip:XXXXXXXXX@fetion.com.cn;p=XXX" -b "hello world" |
fetionclient.py 的使用:
1 | python fetionclient.py -m 159xxxxxxxx |
启动服务器之后, 可以通过 curl 来发起个 HTTP POST 的请求来发送短信, 或者是直接访问 http://localhost:8765 通过 Web 界面来发送
1 2 | # the to and body parameters should be quoted curl -d "to=sip%3AXXXXXXXXX%40fetion.com.cn%3Bp%3DXXXX&body=hello%20world" "http://localhost:8765" |
项目的起由是因为小员一直想要个服务器状态短信通知的程序, 短信网关没钱买, 只好打飞信的主意了… 刚好最近公司的项目结了, 有时间可以挥霍, 闲得蛋疼就开始动手了. 一开始我先把 PHP 的 open fetion 调通, 然后小员把它翻译成了 python 的版本, 最后我再整了个基于 twisted 的版本出来.
目前项目是满足了我们的所有需求了, 所以版本也许会一直停留 0.1 上, 如果不继续蛋疼的话…
Gmail Migration Script 
Python October 15th, 2008
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | #!/usr/bin/env python import cPickle as pickle import imaplib import sys from sexp import scan_sexp def savetemp(name, value): f = open(name, 'w') pickle.dump(value, f) f.close() def loadtemp(name): try: f = open(name, 'r') value = pickle.load(f) f.close() return value except: return {} HOST = 'imap.gmail.com' PORT = 993 SOURCE_USER = 'XXX@gmail.com' SOURCE_PASS = 'XXX' DEST_USER = 'XXX@gmail.com' DEST_PASS = 'XXX' TEMPFILE = 'temp' print 'connecting...' source = imaplib.IMAP4_SSL(HOST, PORT) print 'logging in...' source.login(SOURCE_USER, SOURCE_PASS) print 'connecting...' dest = imaplib.IMAP4_SSL(HOST, PORT) print 'logging in...' dest.login(DEST_USER, DEST_PASS) labels = { '[Gmail]/All Mail': '[Gmail]/All Mail', } temp = loadtemp(TEMPFILE) for slabel, dlabel in labels.items(): # creating temp for slabel if not temp.has_key(slabel): temp[slabel] = {'dest_uids': {}, 'dest_info': {}, 'source_uids': {}} # printing temp info for slabel print 'loaded for %s: "dest_uids": %d, "dest_info": %d, "source_uids": %d' % (slabel, len(temp[slabel]['dest_uids']), len(temp[slabel]['dest_info']), len(temp[slabel]['source_uids'])) print 'selecting source folder %s...' % slabel if source.select(slabel)[0] == 'NO': print 'error: select failed' continue else: print 'selecting dest folder %s...' % dlabel if dest.select(dlabel)[0] == 'NO': print 'dest folder not found; creating...' if dest.create(dlabel)[0]=='NO' or dest.select(dlabel)[0]=='NO': print 'error: could not create folder' continue print 'analyzing existing messages...' uids = dest.search(None, 'ALL')[1][0].split() if uids: i = 1 length = len(uids) for uid in uids: # progress sys.stdout.write('\r%d/%d' % (i, length)) sys.stdout.flush() i += 1 if not temp[slabel]['dest_uids'].has_key(uid): try: msg_id = scan_sexp(dest.fetch(uid, 'ENVELOPE')[1][0])[1][1][-1] temp[slabel]['dest_uids'][uid] = True temp[slabel]['dest_info'][msg_id] = True except: pass print '\nwriting dest_uids and dest_info temp...' savetemp(TEMPFILE, temp) print 'migrating...' uids = source.search(None, 'ALL')[1][0].split() if uids: i = 1 length = len(uids) for uid in uids: # progress sys.stdout.write('\r%d/%d' % (i, length)) sys.stdout.flush() i += 1 if not temp[slabel]['source_uids'].has_key(uid): try: msg_id = scan_sexp(source.fetch(uid, 'ENVELOPE')[1][0])[1][1][-1] if not temp[slabel]['dest_info'].has_key(msg_id): msg = source.fetch(uid, '(RFC822 FLAGS INTERNALDATE)') dest.append(dlabel, \ imaplib.ParseFlags(msg[1][1][:-1]), \ imaplib.Internaldate2tuple(msg[1][1][:-1]), \ msg[1][0][1]) temp[slabel]['source_uids'][uid] = True else: temp[slabel]['source_uids'][uid] = True except: pass print '\nwriting dest_uids and dest_info temp...' savetemp(TEMPFILE, temp) try: source.close() dest.close() except: pass print '\ndone' |
Tags: python gmail migration
Monitoring Directory 
Leopard, Python September 5th, 2008
Damn! The project sucks! We have to work on a server far away from American. Poor VPN connection, high network delay… Even though we have a local copy, no server environment up…
But Xupeng and me found a good solution for it yesterday. We ran a program for monitoring the code directory’s changes. When we modify the code, it well transfer the modified file to remote server automatically. So we just need to modify the code, and refresh the browser, haha ![]()
Xupeng has written a inotify program for doing monitoring on Linux. But my Leopard have no good file system events mechanism for it. FSEvents and kqueue can’t show what changed within the directory, so I need do some extra works for indexing the directory’s files for comparing.
Here are the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | from FSEvents import * import objc import os import sys import stat def fsevents_callback(streamRef, clientInfo, numEvents, eventPaths, eventMasks, eventIDs): full_path = clientInfo global previous_mtimes for i in range(numEvents): path = eventPaths[i] if path[-1] == '/': path = path[:-1] temp_mtimes = {} for dirpath, dirnames, filenames in os.walk(full_path): for filename in filenames: filename = os.path.join(dirpath, filename) new_mtime = os.path.getmtime(filename) new_size = os.path.getsize(filename) temp_mtimes[filename] = (new_mtime, new_size) if filename not in previous_mtimes: # Do some actions elif new_mtime > previous_mtimes[filename][0] and new_size != previous_mtimes[filename][1]: # Do some actions previous_mtimes = temp_mtimes def my_FSEventStreamCreate(path): streamRef = FSEventStreamCreate(kCFAllocatorDefault, fsevents_callback, path, [path], kFSEventStreamEventIdSinceNow, 1.0, 0) if streamRef is None: return None return streamRef if __name__ == "__main__": full_path = '/path/to/code/' previous_mtimes = {} # Create a files index for target directory for dirpath, dirnames, filenames in os.walk(full_path): for filename in filenames: filename = os.path.join(dirpath, filename) previous_mtimes[filename] = (os.path.getmtime(filename), os.path.getsize(filename)) streamRef = my_FSEventStreamCreate(full_path) FSEventStreamScheduleWithRunLoop(streamRef, CFRunLoopGetCurrent(), kCFRunLoopDefaultMode) startedOK = FSEventStreamStart(streamRef) if not startedOK: exit() # Run CFRunLoopRun() #Stop / Invalidate / Release FSEventStreamStop(streamRef) FSEventStreamInvalidate(streamRef) #FSEventStreamRelease(streamRef) |