Python Fetion chinese

Python November 5th, 2008

项目地址在: http://git.lazytech.info/?p=python-fetion.git

首先感谢 nathan’s space 的飞信协议分析以及 open fetionfetion protocol plugin for pidgin 这两个项目的代码对我的启发 :)

介绍:
fetion.py 是飞信 HTTP 的实现, 可以获得好友列表和发送短信
twisted-fetion 下的 fetionclient.py 是飞信的 TCP socket 的实现, 可以用来收发短信, 支持外部程序通过 HTTP POST 的方式来发送短信

fetion.py 的使用:

1
python fetion.py -m 159xxxxxxxx -t "sip:XXXXXXXXX@fetion.com.cn;p=XXX" -b "hello world"

fetionclient.py 的使用:

1
python fetionclient.py -m 159xxxxxxxx

启动服务器之后, 可以通过 curl 来发起个 HTTP POST 的请求来发送短信, 或者是直接访问 http://localhost:8765 通过 Web 界面来发送

1
2
# the to and body parameters should be quoted
curl -d "to=sip%3AXXXXXXXXX%40fetion.com.cn%3Bp%3DXXXX&body=hello%20world" "http://localhost:8765"

项目的起由是因为小员一直想要个服务器状态短信通知的程序, 短信网关没钱买, 只好打飞信的主意了… 刚好最近公司的项目结了, 有时间可以挥霍, 闲得蛋疼就开始动手了. 一开始我先把 PHP 的 open fetion 调通, 然后小员把它翻译成了 python 的版本, 最后我再整了个基于 twisted 的版本出来.

目前项目是满足了我们的所有需求了, 所以版本也许会一直停留 0.1 上, 如果不继续蛋疼的话…

Tags: ,

Gmail Migration Script chinese

Python October 15th, 2008

?Download sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
 
import cPickle as pickle
import imaplib
import sys
 
from sexp import scan_sexp
 
def savetemp(name, value):
    f = open(name, 'w')
    pickle.dump(value, f)
    f.close()
 
def loadtemp(name):
    try:
        f = open(name, 'r')
        value = pickle.load(f)
        f.close()
        return value
    except:
        return {}
 
HOST = 'imap.gmail.com'
PORT = 993
SOURCE_USER = 'XXX@gmail.com'
SOURCE_PASS = 'XXX'
DEST_USER = 'XXX@gmail.com'
DEST_PASS = 'XXX'
TEMPFILE = 'temp'
 
print 'connecting...'
source = imaplib.IMAP4_SSL(HOST, PORT)
print 'logging in...'
source.login(SOURCE_USER, SOURCE_PASS)
print 'connecting...'
dest = imaplib.IMAP4_SSL(HOST, PORT)
print 'logging in...'
dest.login(DEST_USER, DEST_PASS)
 
labels = {
    '[Gmail]/All Mail': '[Gmail]/All Mail',
}
 
temp = loadtemp(TEMPFILE)
 
for slabel, dlabel in labels.items():
    # creating temp for slabel
    if not temp.has_key(slabel):
        temp[slabel] = {'dest_uids': {}, 'dest_info': {}, 'source_uids': {}}
 
    # printing temp info for slabel
    print 'loaded for %s: "dest_uids": %d, "dest_info": %d, "source_uids": %d' % (slabel,
                                                                                  len(temp[slabel]['dest_uids']),
                                                                                  len(temp[slabel]['dest_info']),
                                                                                  len(temp[slabel]['source_uids']))
 
    print 'selecting source folder %s...' % slabel
    if source.select(slabel)[0] == 'NO':
        print 'error: select failed'
        continue
    else:
        print 'selecting dest folder %s...' % dlabel
        if dest.select(dlabel)[0] == 'NO':
            print 'dest folder not found; creating...'
            if dest.create(dlabel)[0]=='NO' or dest.select(dlabel)[0]=='NO':
                print 'error: could not create folder'
                continue
 
        print 'analyzing existing messages...'
        uids = dest.search(None, 'ALL')[1][0].split()
        if uids:
            i = 1
            length = len(uids)
            for uid in uids:
                # progress
                sys.stdout.write('\r%d/%d' % (i, length))
                sys.stdout.flush()
                i += 1
 
                if not temp[slabel]['dest_uids'].has_key(uid):
                    try:
                        msg_id = scan_sexp(dest.fetch(uid, 'ENVELOPE')[1][0])[1][1][-1]
                        temp[slabel]['dest_uids'][uid] = True
                        temp[slabel]['dest_info'][msg_id] = True
                    except:
                        pass
 
        print '\nwriting dest_uids and dest_info temp...'
        savetemp(TEMPFILE, temp)
 
        print 'migrating...'
        uids = source.search(None, 'ALL')[1][0].split()
        if uids:
            i = 1
            length = len(uids)
            for uid in uids:
                # progress
                sys.stdout.write('\r%d/%d' % (i, length))
                sys.stdout.flush()
                i += 1
 
                if not temp[slabel]['source_uids'].has_key(uid):
                    try:
                        msg_id = scan_sexp(source.fetch(uid, 'ENVELOPE')[1][0])[1][1][-1]
                        if not temp[slabel]['dest_info'].has_key(msg_id):
                            msg = source.fetch(uid, '(RFC822 FLAGS INTERNALDATE)')
                            dest.append(dlabel, \
                                        imaplib.ParseFlags(msg[1][1][:-1]), \
                                        imaplib.Internaldate2tuple(msg[1][1][:-1]), \
                                        msg[1][0][1])
                            temp[slabel]['source_uids'][uid] = True
                        else:
                            temp[slabel]['source_uids'][uid] = True
                    except:
                        pass
 
        print '\nwriting dest_uids and dest_info temp...'
        savetemp(TEMPFILE, temp)
 
try:
    source.close()
    dest.close()
except:
    pass
 
print '\ndone'

Read the rest of this entry »

Tags:

Monitoring Directory chinese

Leopard, Python September 5th, 2008

Damn! The project sucks! We have to work on a server far away from American. Poor VPN connection, high network delay… Even though we have a local copy, no server environment up…
But Xupeng and me found a good solution for it yesterday. We ran a program for monitoring the code directory’s changes. When we modify the code, it well transfer the modified file to remote server automatically. So we just need to modify the code, and refresh the browser, haha :)
Xupeng has written a inotify program for doing monitoring on Linux. But my Leopard have no good file system events mechanism for it. FSEvents and kqueue can’t show what changed within the directory, so I need do some extra works for indexing the directory’s files for comparing.
Here are the code:

?View Code PYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from FSEvents import *
import objc
import os
import sys
import stat
 
def fsevents_callback(streamRef, clientInfo, numEvents, eventPaths, eventMasks, eventIDs):
    full_path = clientInfo
    global previous_mtimes
 
    for i in range(numEvents):
        path = eventPaths[i]
        if path[-1] == '/':
            path = path[:-1]
 
        temp_mtimes = {}
        for dirpath, dirnames, filenames in os.walk(full_path):
            for filename in filenames:
                filename = os.path.join(dirpath, filename)
                new_mtime = os.path.getmtime(filename)
                new_size = os.path.getsize(filename)
                temp_mtimes[filename] = (new_mtime, new_size)
                if filename not in previous_mtimes:                                                   
                    # Do some actions
                elif new_mtime > previous_mtimes[filename][0] and new_size != previous_mtimes[filename][1]:
                    # Do some actions
 
        previous_mtimes = temp_mtimes
 
def my_FSEventStreamCreate(path):
    streamRef = FSEventStreamCreate(kCFAllocatorDefault,
                                    fsevents_callback,
                                    path,
                                    [path],
                                    kFSEventStreamEventIdSinceNow,
                                    1.0,
                                    0)
    if streamRef is None:
        return None
 
    return streamRef
 
if __name__ == "__main__":
    full_path = '/path/to/code/'
    previous_mtimes = {}
 
    # Create a files index for target directory
    for dirpath, dirnames, filenames in os.walk(full_path):
        for filename in filenames:
            filename = os.path.join(dirpath, filename)
            previous_mtimes[filename] =  (os.path.getmtime(filename), os.path.getsize(filename))
 
    streamRef = my_FSEventStreamCreate(full_path)
    FSEventStreamScheduleWithRunLoop(streamRef, CFRunLoopGetCurrent(), kCFRunLoopDefaultMode)
    startedOK = FSEventStreamStart(streamRef)
 
    if not startedOK:
        exit()
 
    # Run
    CFRunLoopRun()
 
    #Stop / Invalidate / Release
    FSEventStreamStop(streamRef)
    FSEventStreamInvalidate(streamRef)
    #FSEventStreamRelease(streamRef)

Tags: ,