source: trunk/amazonbot/amazonbot.py @ 20

Revision 20, 12.8 KB checked in by atzm, 17 years ago (diff)
  • Add !status command and small bug fix
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4__version__ = '$Revision$'
5__author__ = 'Atzm WATANABE <sitosito@p.chan.ne.jp>'
6__date__ = '$Date$'
7__copyright__ = 'Copyright(C) 2006 Atzm WATANABE, all rights reserved.'
8__license__ = 'Python'
9
10import re
11import sys
12import time
13import shlex
14import random
15import getopt
16
17import MeCab
18import nkf
19
20from ircbot import SingleServerIRCBot
21from irclib import nm_to_n
22
23try:
24    set, frozenset
25except NameError:
26    from sets import Set as set, ImmutableSet as frozenset
27
28
29import config; config.init()
30
31import my_amazon
32my_amazon.setLocale(config.get('amazon', 'locale'))
33my_amazon.setLicense(config.get('amazon', 'access_key'))
34
35
36DEBUG_MSG_TO = sys.stderr
37
38
39def uniq(sequence):
40    """リストから重耇を取り陀く (順番が狂うので泚意)
41    """
42    return list(set(sequence))
43
44def unicoding(text):
45    """text を匷制的に unicode オブゞェクトに倉換
46    """
47    if type(text) is unicode:
48        return text
49    return unicode(nkf.nkf('-w', text), 'utf-8')
50
51def ununicoding(text, encoding='iso-2022-jp'):
52    """text を指定された encoding で゚ンコヌドしraw str に匷制倉換
53    """
54    if type(text) is not unicode:
55        return unicoding(text).encode(encoding)
56    return text.encode(encoding)
57
58def mecab_parse(text):
59    """MeCab を䜿っお圢態玠解析し固有名詞ず䞀般名詞だけを抜出する
60    """
61    def choice_nominal(wlist):
62        res = []
63        for word, wtype in wlist:
64            wtypes = wtype.split('-')
65            if '固有名詞' in wtypes or ('名詞' in wtypes and '䞀般' in wtypes):
66                res.append(unicoding(word))
67        return res
68
69    text = ununicoding(text, 'utf-8')
70    result = []
71    tag = MeCab.Tagger('-Ochasen')
72    for line in tag.parse(text).split('\n'):
73        if not line or line == 'EOS':
74            break
75        words = line.split()
76        result.append((words[0], words[-1])) # word, word-type
77
78    result = uniq(choice_nominal(result))
79    return result
80
81def _debug(fmt, *args):
82    if __debug__:
83        timeline = time.strftime("%b %d %T", time.localtime())
84        try:
85            fmt = ununicoding(fmt, 'euc-jp')
86            args = list(args)
87            for i in range(len(args)):
88                if isinstance(args[i], basestring):
89                    args[i] = ununicoding(args[i], 'euc-jp')
90
91            print >> DEBUG_MSG_TO, '(%s) <DEBUG>' % timeline,
92            print >> DEBUG_MSG_TO, fmt % tuple(args)
93
94        except:
95            print >> DEBUG_MSG_TO, '(%s) <DEBUG>' % timeline,
96            print >> DEBUG_MSG_TO, '!! debug message print failed !!'
97
98class AmazonBotBase(SingleServerIRCBot):
99    """アマゟンボットのベヌスクラス
100    単䜓では受け取ったメッセヌゞの圢態玠解析ず名詞抜出たでしかやらない
101    サブクラスで process_keyword を実装しお Amazon ぞク゚リを投げるべし
102
103    サブクラスには onmsg_HOGEHOGE(self, conn, ev, to, args) メ゜ッドを䜜るこずでコマンド远加可胜
104    コマンド曞匏は !HOGEHOGE arg [, arg2, ...] ずなる
105    ヘルプはメ゜ッドに docstring を曞けば OK
106    """
107    def __init__(self):
108        _server = [(config.get('irc', 'server'), config.get('irc', 'port', 'int'))]
109        _nick = config.get('bot', 'nick')
110
111        self._current_lines = 0
112        self._prev_time = time.time() - config.get('freq', 'timeout', 'int')
113        self._silent = False
114        SingleServerIRCBot.__init__(self, _server, _nick, _nick)
115
116    def start(self):
117        try:
118            SingleServerIRCBot.start(self)
119        except KeyboardInterrupt:
120            self.die(ununicoding(config.get('bot', 'bye')))
121
122    def on_welcome(self, c, e):
123        c.join(config.get('irc', 'channel'))
124        _debug('Joined %s', config.get('irc', 'channel'))
125
126    def on_nicknameinuse(self, c, e):
127        c.nick(c.get_nickname() + '_')
128
129    def on_privmsg(self, c, e):
130        return self.on_pubmsg(c, e, to=nm_to_n(e.source()))
131
132    def on_pubmsg(self, c, e, to=config.get('irc', 'channel')):
133        msg = unicoding(e.arguments()[0])
134        _debug('pubmsg incoming "%s", should be reply to %s', msg, to)
135
136        if msg[0] == '!':
137            try:
138                words = shlex.split(ununicoding(msg, 'utf-8')[1:])
139            except:
140                return False
141            if not words:
142                return False
143            method = getattr(self, 'onmsg_%s' % words[0], lambda *arg: False)
144            return method(c, e, to, words[1:]) # words[0] == command name
145
146        # silence
147        self.silence(msg, c, e, to)
148        if self._silent:
149            return False
150
151        # freq_lines
152        self._current_lines += 1
153        _freq_lines = config.get('freq', 'lines', 'int')
154        if _freq_lines:
155            if config.get('freq', 'lines_random', 'boolean'):
156                _freq_lines = random.randint(int(_freq_lines/2)+1, _freq_lines)
157
158            _debug('Line count: now %d, next: %d', self._current_lines, _freq_lines)
159
160            if self._current_lines < _freq_lines:
161                return False
162        self._current_lines = 0
163
164        # freq
165        _current_time = time.time()
166        if _current_time < self._prev_time + config.get('freq', 'timeout', 'int'):
167            cur = time.strftime('%H:%M:%S', time.localtime(_current_time))
168            go = time.strftime('%H:%M:%S', time.localtime(
169                self._prev_time + config.get('freq', 'timeout', 'int')))
170            _debug('Not expired: now %s, be expired at: %s', cur, go)
171            return False
172        self._prev_time = _current_time
173
174        nominals = mecab_parse(msg)
175        if not nominals:
176            _debug("Couldn't find nominal words")
177            return False
178
179        title, url = self.process_keyword(' '.join(nominals))
180        if title and url:
181            content = unicoding(config.get('bot', 'content'))
182            try:
183                message = ununicoding(': '.join([content, title, url]))
184            except UnicodeError, err:
185                # なぜかたたに unicode オブゞェクトを iso-2022-jp で゚ンコヌドできない
186                _debug('%s', str(err))
187                return False
188
189            c.notice(to, message)
190            return True
191        return False
192
193    ACTIVE_PATTERN = re.compile(unicoding(config.get('bot', 'active_pattern')))
194    SILENT_PATTERN = re.compile(unicoding(config.get('bot', 'silent_pattern')))
195    def silence(self, msg, c, e, to):
196        active = self.ACTIVE_PATTERN.search(msg)
197        silent = self.SILENT_PATTERN.search(msg)
198        _debug('ACT_PATT: %s, SIL_PATT: %s', str(active), str(silent))
199
200        if active:
201            self._silent = False
202            c.notice(to, ununicoding(config.get('bot', 'thanks')))
203        elif silent:
204            self._silent = True
205            c.notice(to, ununicoding(config.get('bot', 'sorry')))
206
207    def process_keyword(self, keyword):
208        return [None, None]
209
210    def is_silent(self):
211        return self._silent
212    def get_current_lines(self):
213        return self._current_lines
214    def get_prev_time(self):
215        return self._prev_time
216
217class AmazonBot(AmazonBotBase):
218    """アマゟンボットの実装クラス
219    process_keyword メ゜ッドで Amazon ぞク゚リを投げお結果を返す
220    """
221    _AVAIL_PRODUCT_LINES = {
222        'books-jp': '(和曞, default)',
223        'books-us': '(掋曞)',
224        'music-jp': '(ポピュラヌ音楜)',
225        'classical-jp': '(クラシック音楜)',
226        'dvd-jp': '(DVD)',
227        'vhs-jp': '(ビデオ)',
228        'electronics-jp': '(゚レクトロニクス)',
229        'kitchen-jp': '(ホヌムキッチン)',
230        'software-jp': '(゜フトりェア)',
231        'videogames-jp': '(ゲヌム)',
232        'magazines-jp': '(雑誌)',
233        'toys-jp': '(おもちゃホビヌ)',
234    }
235
236    def __init__(self):
237        AmazonBotBase.__init__(self)
238
239    def get_version(self):
240        return 'AmazonBot by %s, based on python-irclib' % __author__
241
242    def onmsg_s(self, c, e, to, args): return self.onmsg_status(c, e, to, args)
243    def onmsg_status(self, c, e, to, args):
244        """Syntax: !status
245        """
246        _debug('in status command: %s', str(args))
247
248        c.notice(to, 'silent: %s' % self.is_silent())
249        c.notice(to, 'current lines: %d' % self.get_current_lines())
250        c.notice(to, time.strftime('previous time: %b %d %T',
251                                   time.localtime(self.get_prev_time())))
252        return True
253
254    def onmsg_isbn(self, c, e, to, args):
255        """Syntax: !isbn <ISBN number>
256        """
257        return self.onmsg_asin(c, e, to, args)
258    def onmsg_asin(self, c, e, to, args):
259        """Syntax: !asin <ASIN number>
260        """
261        _debug('in asin command: %s', str(args))
262
263        try:
264            data = my_amazon.searchByASIN(args[0])
265        except my_amazon.AmazonError, err:
266            c.notice(to, ununicoding(config.get('bot', 'no_products')))
267            _debug('Caught AmazonError in onmsg_asin: %s', str(err))
268            return False
269        except IndexError, err:
270            c.notice(to, 'Please specify an argument.')
271            return False
272
273        return self._process_onmsg(c, e, to, data)
274
275    def onmsg_k(self, c, e, to, args): return self.onmsg_keyword(c, e, to, args)
276    def onmsg_keyword(self, c, e, to, args):
277        """Syntax: !keyword [-h] [-t type] <keyword1> [, keyword2, ...]
278        """
279        _debug('in keyword command: %s', str(args))
280
281        try:
282            options, rest = getopt.getopt(args, 't:h', ['type=', 'help'])
283        except getopt.GetoptError, err:
284            _debug('Caught GetoptError in onmsg_keyword: %s', str(err))
285            return False
286
287        keyword = ' '.join(rest).strip()
288        product_line = 'books-jp'
289        for opt, val in options:
290            if opt in ['-t', '--type']:
291                if val not in self._AVAIL_PRODUCT_LINES.keys():
292                    c.notice(to, 'Type "%s" is not available.' % val)
293                    return False
294
295                product_line = val
296                break
297
298            elif opt in ['-h', '--help']:
299                _from = nm_to_n(e.source()) # ログを流しおしたうのでヘルプは盎接送信元ぞ
300                c.notice(_from, ununicoding('Available types:'))
301
302                for key, val in self._AVAIL_PRODUCT_LINES.iteritems():
303                    time.sleep(1) # XXX: 連続投皿するず匟かれるこずがあるので暫定察凊
304                    c.notice(_from, ununicoding(' * %s: %s' % (key, val)))
305
306                return True
307
308        if not keyword:
309            c.notice(to, 'Please specify keywords.')
310            return False
311
312        _debug('keyword="%s", product_line=%s', keyword, product_line)
313
314        try:
315            data = my_amazon.searchByKeyword(keyword, product_line=product_line)
316        except my_amazon.AmazonError, err:
317            c.notice(to, ununicoding(config.get('bot', 'no_products')))
318            _debug('Caught AmazonError in onmsg_amazon: %s', str(err))
319            return False
320
321        return self._process_onmsg(c, e, to, data)
322
323    def onmsg_h(self, c, e, to, args): return self.onmsg_help(c, e, to, args)
324    def onmsg_help(self, c, e, to, args):
325        """Syntax: !help
326        """
327        _debug('in help command: %s', str(args))
328
329        _from = nm_to_n(e.source()) # ログを流しおしたうのでヘルプは盎接送信元ぞ
330        c.notice(_from, self.get_version())
331
332        docs = []
333        for key in dir(self):
334            val = getattr(self, key, '')
335            _debug('key=%s, val=%s', key, str(val))
336
337            if key[:6] != 'onmsg_':
338                continue
339
340            doc = val.__doc__
341            if doc:
342                doc = doc.strip()
343                if not doc:
344                    continue
345                time.sleep(1) # XXX: 連続投皿するず匟かれるっぜいので暫定察凊
346                c.notice(_from, doc)
347
348        return True
349
350    def _process_onmsg(self, c, e, to, data):
351        if type(data.Details) is not list:
352            data.Details = [data.Details]
353
354        detail = random.choice(data.Details)
355        title = ununicoding(detail.ProductName)
356        url = ununicoding(detail.URL)
357        c.notice(to, '%(title)s: %(url)s' % locals())
358
359        return True
360
361    def process_keyword(self, keyword):
362        keyword = ununicoding(keyword, 'utf-8')
363        _debug('KEYWORD: %s', keyword)
364
365        try:
366            data = my_amazon.searchByBlended(keyword)
367            if type(data.ProductLine) is not type([]):
368                data.ProductLine = [data.ProductLine]
369        except my_amazon.AmazonError, err:
370            _debug('Caught AmazonError: %s', str(err))
371            return [None, None]
372
373        product_line = random.choice(data.ProductLine)
374        detail = random.choice(product_line.ProductInfo.Details)
375
376        url = unicoding(getattr(detail, 'URL', None))
377        product_name = unicoding(getattr(detail, 'ProductName', None))
378
379        return [product_name, url]
380
381if __name__ == '__main__':
382    bot = AmazonBot()
383    bot.start()
384    print '> Bye ;)'
Note: See TracBrowser for help on using the repository browser.