Changeset 253 for etherws/trunk
- Timestamp:
- 10/09/13 20:58:55 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
etherws/trunk/etherws.py
r252 r253 120 120 return self.age > self._ageout 121 121 122 def __init__(self, ageout, debug =False):122 def __init__(self, ageout, debug): 123 123 self._ageout = ageout 124 124 self._debug = debug … … 191 191 return cmp(x.number, y.number) 192 192 193 def __init__(self, fdb, debug =False):193 def __init__(self, fdb, debug): 194 194 self.fdb = fdb 195 195 self._debug = debug … … 345 345 346 346 347 class EtherWebSocketHandler(DebugMixIn, BasicAuthMixIn, WebSocketHandler):347 class ServerHandler(DebugMixIn, BasicAuthMixIn, WebSocketHandler): 348 348 IFTYPE = 'server' 349 350 def __init__(self, app, req, switch, htpasswd=None, debug=False): 351 super(EtherWebSocketHandler, self).__init__(app, req) 349 IFOP_ALLOWED = False 350 351 def __init__(self, app, req, switch, htpasswd, debug): 352 super(ServerHandler, self).__init__(app, req) 352 353 self._switch = switch 353 354 self._htpasswd = htpasswd … … 372 373 373 374 374 class NetdevHandler(DebugMixIn): 375 IFTYPE = 'netdev' 376 READ_SIZE = 65535 377 ETH_P_ALL = 0x0003 # from <linux/if_ether.h> 378 379 def __init__(self, ioloop, switch, dev, debug=False): 375 class BaseClientHandler(DebugMixIn): 376 IFTYPE = 'baseclient' 377 IFOP_ALLOWED = False 378 379 def __init__(self, ioloop, switch, target, debug, *args, **kwargs): 380 380 self._ioloop = ioloop 381 381 self._switch = switch 382 self._ dev = dev382 self._target = target 383 383 self._debug = debug 384 self._sock = None 385 386 @property 387 def target(self): 388 return self._dev 389 390 @property 391 def closed(self): 392 return not self._sock 384 self._args = args 385 self._kwargs = kwargs 386 self._device = None 393 387 394 388 @property 395 389 def address(self): 396 if self.closed: 397 raise ValueError('I/O operation on closed netdev') 398 return '' 390 raise NotImplementedError('unsupported') 399 391 400 392 @property 401 393 def netmask(self): 402 if self.closed: 403 raise ValueError('I/O operation on closed netdev') 404 return '' 394 raise NotImplementedError('unsupported') 405 395 406 396 @property 407 397 def mtu(self): 408 if self.closed: 409 raise ValueError('I/O operation on closed netdev') 410 return '' 398 raise NotImplementedError('unsupported') 411 399 412 400 @address.setter 413 401 def address(self, address): 414 if self.closed:415 raise ValueError('I/O operation on closed netdev')416 402 raise NotImplementedError('unsupported') 417 403 418 404 @netmask.setter 419 405 def netmask(self, netmask): 420 if self.closed:421 raise ValueError('I/O operation on closed netdev')422 406 raise NotImplementedError('unsupported') 423 407 424 408 @mtu.setter 425 409 def mtu(self, mtu): 426 if self.closed:427 raise ValueError('I/O operation on closed netdev')428 410 raise NotImplementedError('unsupported') 429 411 430 412 def open(self): 431 if not self.closed: 432 raise ValueError('Already opened') 433 self._sock = socket.socket( 434 socket.PF_PACKET, socket.SOCK_RAW, socket.htons(self.ETH_P_ALL)) 435 self._sock.bind((self._dev, self.ETH_P_ALL)) 436 self._ioloop.add_handler(self.fileno(), self, self._ioloop.READ) 437 return self._switch.register_port(self) 413 raise NotImplementedError('unsupported') 414 415 def write_message(self, message, binary=False): 416 raise NotImplementedError('unsupported') 417 418 def read(self): 419 raise NotImplementedError('unsupported') 420 421 @property 422 def target(self): 423 return self._target 424 425 @property 426 def device(self): 427 return self._device 428 429 @property 430 def closed(self): 431 return not self.device 438 432 439 433 def close(self): 440 434 if self.closed: 441 raise ValueError('I/O operation on closed netdev') 442 self._switch.unregister_port(self) 443 self._ioloop.remove_handler(self.fileno()) 444 self._sock.close() 445 self._sock = None 435 raise ValueError('I/O operation on closed %s' % self.IFTYPE) 436 self.leave_switch() 437 self.unregister_device() 438 self.dprintf('disconnected: %s\n', lambda: self.target) 439 440 def register_device(self, device): 441 self._device = device 442 443 def unregister_device(self): 444 self._device.close() 445 self._device = None 446 446 447 447 def fileno(self): 448 448 if self.closed: 449 raise ValueError('I/O operation on closed netdev') 450 return self._sock.fileno() 451 452 def write_message(self, message, binary=False): 453 if self.closed: 454 raise ValueError('I/O operation on closed netdev') 455 self._sock.sendall(message) 449 raise ValueError('I/O operation on closed %s' % self.IFTYPE) 450 return self.device.fileno() 456 451 457 452 def __call__(self, fd, events): 458 453 try: 459 self._switch.receive(self, EthernetFrame(self._read())) 460 return 461 except: 462 traceback.print_exc() 463 self.close() 464 465 def _read(self): 466 if self.closed: 467 raise ValueError('I/O operation on closed netdev') 468 buf = [] 469 while True: 470 buf.append(self._sock.recv(self.READ_SIZE)) 471 if len(buf[-1]) < self.READ_SIZE: 472 break 473 return ''.join(buf) 474 475 476 class TapHandler(DebugMixIn): 477 IFTYPE = 'tap' 478 READ_SIZE = 65535 479 480 def __init__(self, ioloop, switch, dev, debug=False): 481 self._ioloop = ioloop 482 self._switch = switch 483 self._dev = dev 484 self._debug = debug 485 self._tap = None 486 487 @property 488 def target(self): 489 if self.closed: 490 return self._dev 491 return self._tap.name 492 493 @property 494 def closed(self): 495 return not self._tap 496 497 @property 498 def address(self): 499 if self.closed: 500 raise ValueError('I/O operation on closed tap') 501 try: 502 return self._tap.addr 503 except: 504 return '' 505 506 @property 507 def netmask(self): 508 if self.closed: 509 raise ValueError('I/O operation on closed tap') 510 try: 511 return self._tap.netmask 512 except: 513 return '' 514 515 @property 516 def mtu(self): 517 if self.closed: 518 raise ValueError('I/O operation on closed tap') 519 return self._tap.mtu 520 521 @address.setter 522 def address(self, address): 523 if self.closed: 524 raise ValueError('I/O operation on closed tap') 525 self._tap.addr = address 526 527 @netmask.setter 528 def netmask(self, netmask): 529 if self.closed: 530 raise ValueError('I/O operation on closed tap') 531 self._tap.netmask = netmask 532 533 @mtu.setter 534 def mtu(self, mtu): 535 if self.closed: 536 raise ValueError('I/O operation on closed tap') 537 self._tap.mtu = mtu 538 539 def open(self): 540 if not self.closed: 541 raise ValueError('Already opened') 542 self._tap = TunTapDevice(self._dev, IFF_TAP | IFF_NO_PI) 543 self._tap.up() 544 self._ioloop.add_handler(self.fileno(), self, self._ioloop.READ) 545 return self._switch.register_port(self) 546 547 def close(self): 548 if self.closed: 549 raise ValueError('I/O operation on closed tap') 550 self._switch.unregister_port(self) 551 self._ioloop.remove_handler(self.fileno()) 552 self._tap.close() 553 self._tap = None 554 555 def fileno(self): 556 if self.closed: 557 raise ValueError('I/O operation on closed tap') 558 return self._tap.fileno() 559 560 def write_message(self, message, binary=False): 561 if self.closed: 562 raise ValueError('I/O operation on closed tap') 563 self._tap.write(message) 564 565 def __call__(self, fd, events): 566 try: 567 self._switch.receive(self, EthernetFrame(self._read())) 568 return 569 except: 570 traceback.print_exc() 571 self.close() 572 573 def _read(self): 574 if self.closed: 575 raise ValueError('I/O operation on closed tap') 576 buf = [] 577 while True: 578 buf.append(self._tap.read(self.READ_SIZE)) 579 if len(buf[-1]) < self.READ_SIZE: 580 break 581 return ''.join(buf) 582 583 584 class EtherWebSocketClient(DebugMixIn): 585 IFTYPE = 'client' 586 587 def __init__(self, ioloop, switch, url, ssl_=None, cred=None, debug=False): 588 self._ioloop = ioloop 589 self._switch = switch 590 self._url = url 591 self._ssl = ssl_ 592 self._debug = debug 593 self._sock = None 594 self._options = {} 595 596 if isinstance(cred, dict) and cred['user'] and cred['passwd']: 597 token = base64.b64encode('%s:%s' % (cred['user'], cred['passwd'])) 598 auth = ['Authorization: Basic %s' % token] 599 self._options['header'] = auth 600 601 @property 602 def target(self): 603 return self._url 604 605 @property 606 def closed(self): 607 return not self._sock 608 609 def open(self): 610 sslwrap = websocket._SSLSocketWrapper 611 612 if not self.closed: 613 raise websocket.WebSocketException('Already opened') 614 615 if self._ssl: 616 websocket._SSLSocketWrapper = self._ssl 617 618 # XXX: may be blocked 619 try: 620 self._sock = websocket.WebSocket() 621 self._sock.connect(self._url, **self._options) 622 self._ioloop.add_handler(self.fileno(), self, self._ioloop.READ) 623 self.dprintf('connected: %s\n', lambda: self._url) 624 return self._switch.register_port(self) 625 finally: 626 websocket._SSLSocketWrapper = sslwrap 627 628 def close(self): 629 if self.closed: 630 raise websocket.WebSocketException('Already closed') 631 self._switch.unregister_port(self) 632 self._ioloop.remove_handler(self.fileno()) 633 self._sock.close() 634 self._sock = None 635 self.dprintf('disconnected: %s\n', lambda: self._url) 636 637 def fileno(self): 638 if self.closed: 639 raise websocket.WebSocketException('Closed socket') 640 return self._sock.io_sock.fileno() 641 642 def write_message(self, message, binary=False): 643 if self.closed: 644 raise websocket.WebSocketException('Closed socket') 645 if binary: 646 flag = websocket.ABNF.OPCODE_BINARY 647 else: 648 flag = websocket.ABNF.OPCODE_TEXT 649 self._sock.send(message, flag) 650 651 def __call__(self, fd, events): 652 try: 653 data = self._sock.recv() 454 data = self.read() 654 455 if data is not None: 655 456 self._switch.receive(self, EthernetFrame(data)) … … 659 460 self.close() 660 461 661 662 class EtherWebSocketControlHandler(DebugMixIn, BasicAuthMixIn, RequestHandler): 462 def join_switch(self): 463 self._ioloop.add_handler(self.fileno(), self, self._ioloop.READ) 464 return self._switch.register_port(self) 465 466 def leave_switch(self): 467 self._switch.unregister_port(self) 468 self._ioloop.remove_handler(self.fileno()) 469 470 471 class NetdevHandler(BaseClientHandler): 472 IFTYPE = 'netdev' 473 IFOP_ALLOWED = True 474 ETH_P_ALL = 0x0003 # from <linux/if_ether.h> 475 476 @property 477 def address(self): 478 if self.closed: 479 raise ValueError('I/O operation on closed netdev') 480 return '' 481 482 @property 483 def netmask(self): 484 if self.closed: 485 raise ValueError('I/O operation on closed netdev') 486 return '' 487 488 @property 489 def mtu(self): 490 if self.closed: 491 raise ValueError('I/O operation on closed netdev') 492 return '' 493 494 @address.setter 495 def address(self, address): 496 if self.closed: 497 raise ValueError('I/O operation on closed netdev') 498 raise NotImplementedError('unsupported') 499 500 @netmask.setter 501 def netmask(self, netmask): 502 if self.closed: 503 raise ValueError('I/O operation on closed netdev') 504 raise NotImplementedError('unsupported') 505 506 @mtu.setter 507 def mtu(self, mtu): 508 if self.closed: 509 raise ValueError('I/O operation on closed netdev') 510 raise NotImplementedError('unsupported') 511 512 def open(self): 513 if not self.closed: 514 raise ValueError('Already opened') 515 self.register_device(socket.socket( 516 socket.PF_PACKET, socket.SOCK_RAW, socket.htons(self.ETH_P_ALL))) 517 self.device.bind((self.target, self.ETH_P_ALL)) 518 return self.join_switch() 519 520 def write_message(self, message, binary=False): 521 if self.closed: 522 raise ValueError('I/O operation on closed netdev') 523 self.device.sendall(message) 524 525 def read(self): 526 if self.closed: 527 raise ValueError('I/O operation on closed netdev') 528 buf = [] 529 while True: 530 buf.append(self.device.recv(65535)) 531 if len(buf[-1]) < 65535: 532 break 533 return ''.join(buf) 534 535 536 class TapHandler(BaseClientHandler): 537 IFTYPE = 'tap' 538 IFOP_ALLOWED = True 539 540 @property 541 def address(self): 542 if self.closed: 543 raise ValueError('I/O operation on closed tap') 544 try: 545 return self.device.addr 546 except: 547 return '' 548 549 @property 550 def netmask(self): 551 if self.closed: 552 raise ValueError('I/O operation on closed tap') 553 try: 554 return self.device.netmask 555 except: 556 return '' 557 558 @property 559 def mtu(self): 560 if self.closed: 561 raise ValueError('I/O operation on closed tap') 562 return self.device.mtu 563 564 @address.setter 565 def address(self, address): 566 if self.closed: 567 raise ValueError('I/O operation on closed tap') 568 self.device.addr = address 569 570 @netmask.setter 571 def netmask(self, netmask): 572 if self.closed: 573 raise ValueError('I/O operation on closed tap') 574 self.device.netmask = netmask 575 576 @mtu.setter 577 def mtu(self, mtu): 578 if self.closed: 579 raise ValueError('I/O operation on closed tap') 580 self.device.mtu = mtu 581 582 @property 583 def target(self): 584 if self.closed: 585 return self._target 586 return self.device.name 587 588 def open(self): 589 if not self.closed: 590 raise ValueError('Already opened') 591 self.register_device(TunTapDevice(self.target, IFF_TAP | IFF_NO_PI)) 592 self.device.up() 593 return self.join_switch() 594 595 def write_message(self, message, binary=False): 596 if self.closed: 597 raise ValueError('I/O operation on closed tap') 598 self.device.write(message) 599 600 def read(self): 601 if self.closed: 602 raise ValueError('I/O operation on closed tap') 603 buf = [] 604 while True: 605 buf.append(self.device.read(65535)) 606 if len(buf[-1]) < 65535: 607 break 608 return ''.join(buf) 609 610 611 class ClientHandler(BaseClientHandler): 612 IFTYPE = 'client' 613 IFOP_ALLOWED = False 614 615 def __init__(self, *args, **kwargs): 616 super(ClientHandler, self).__init__(*args, **kwargs) 617 618 self._ssl = kwargs.get('ssl', False) 619 self._options = {} 620 621 cred = kwargs.get('cred', None) 622 623 if isinstance(cred, dict) and cred['user'] and cred['passwd']: 624 token = base64.b64encode('%s:%s' % (cred['user'], cred['passwd'])) 625 auth = ['Authorization: Basic %s' % token] 626 self._options['header'] = auth 627 628 def open(self): 629 sslwrap = websocket._SSLSocketWrapper 630 631 if not self.closed: 632 raise websocket.WebSocketException('Already opened') 633 634 if self._ssl: 635 websocket._SSLSocketWrapper = self._ssl 636 637 # XXX: may be blocked 638 try: 639 self.register_device(websocket.WebSocket()) 640 self.device.connect(self.target, **self._options) 641 self.dprintf('connected: %s\n', lambda: self.target) 642 return self.join_switch() 643 finally: 644 websocket._SSLSocketWrapper = sslwrap 645 646 def fileno(self): 647 if self.closed: 648 raise websocket.WebSocketException('Closed socket') 649 return self.device.io_sock.fileno() 650 651 def write_message(self, message, binary=False): 652 if self.closed: 653 raise websocket.WebSocketException('Closed socket') 654 if binary: 655 flag = websocket.ABNF.OPCODE_BINARY 656 else: 657 flag = websocket.ABNF.OPCODE_TEXT 658 self.device.send(message, flag) 659 660 def read(self): 661 if self.closed: 662 raise websocket.WebSocketException('Closed socket') 663 return self.device.recv() 664 665 666 class ControlServerHandler(DebugMixIn, BasicAuthMixIn, RequestHandler): 663 667 NAMESPACE = 'etherws.control' 664 668 IFTYPES = { 665 NetdevHandler.IFTYPE: 666 TapHandler.IFTYPE: 667 EtherWebSocketClient.IFTYPE: EtherWebSocketClient,669 NetdevHandler.IFTYPE: NetdevHandler, 670 TapHandler.IFTYPE: TapHandler, 671 ClientHandler.IFTYPE: ClientHandler, 668 672 } 669 673 670 def __init__(self, app, req, ioloop, switch, htpasswd =None, debug=False):671 super( EtherWebSocketControlHandler, self).__init__(app, req)674 def __init__(self, app, req, ioloop, switch, htpasswd, debug): 675 super(ControlServerHandler, self).__init__(app, req) 672 676 self._ioloop = ioloop 673 677 self._switch = switch … … 738 742 type_ = params['type'] 739 743 target = params['target'] 740 opt s= getattr(self, '_optparse_' + type_)(params.get('options', {}))744 opt = getattr(self, '_optparse_' + type_)(params.get('options', {})) 741 745 cls = self.IFTYPES[type_] 742 interface = cls(self._ioloop, self._switch, target, **opts)746 interface = cls(self._ioloop, self._switch, target, self._debug, **opt) 743 747 portnum = interface.open() 744 748 return {'entries': [self._portstat(self._switch.get_port(portnum))]} … … 762 766 netmask = params.get('netmask') 763 767 mtu = params.get('mtu') 764 if isinstance(port.interface, EtherWebSocketClient):768 if not port.interface.IFOP_ALLOWED: 765 769 raise ValueError('Port %d has unsupported interface: %s' % 766 770 (portnum, port.interface.IFTYPE)) … … 775 779 def handle_listInterface(self, params): 776 780 return {'entries': [self._ifstat(p) for p in self._switch.portlist 777 if not isinstance(p.interface, 778 EtherWebSocketClient)]} 781 if p.interface.IFOP_ALLOWED]} 779 782 780 783 def _optparse_netdev(self, opt): 781 return { 'debug': self._debug}784 return {} 782 785 783 786 def _optparse_tap(self, opt): 784 return { 'debug': self._debug}787 return {} 785 788 786 789 def _optparse_client(self, opt): … … 790 793 ssl_ = lambda sock: ssl.wrap_socket(sock, **args) 791 794 cred = {'user': opt.get('user'), 'passwd': opt.get('passwd')} 792 return {'ssl _': ssl_, 'cred': cred, 'debug': self._debug}795 return {'ssl': ssl_, 'cred': cred} 793 796 794 797 def _jsonrpc_response(self, id_=None, result=None, error=None): … … 911 914 912 915 ioloop = IOLoop.instance() 913 fdb = FDB(a geout=args.ageout, debug=args.debug)914 switch = SwitchingHub(fdb, debug=args.debug)916 fdb = FDB(args.ageout, args.debug) 917 switch = SwitchingHub(fdb, args.debug) 915 918 916 919 if args.port == args.ctlport and args.host == args.ctlhost: … … 923 926 924 927 app = Application([ 925 (args.path, EtherWebSocketHandler, {928 (args.path, ServerHandler, { 926 929 'switch': switch, 927 930 'htpasswd': args.htpasswd, 928 931 'debug': args.debug, 929 932 }), 930 (args.ctlpath, EtherWebSocketControlHandler, {933 (args.ctlpath, ControlServerHandler, { 931 934 'ioloop': ioloop, 932 935 'switch': switch, … … 939 942 940 943 else: 941 app = Application([(args.path, EtherWebSocketHandler, {944 app = Application([(args.path, ServerHandler, { 942 945 'switch': switch, 943 946 'htpasswd': args.htpasswd, … … 947 950 server.listen(args.port, address=args.host) 948 951 949 ctl = Application([(args.ctlpath, EtherWebSocketControlHandler, {952 ctl = Application([(args.ctlpath, ControlServerHandler, { 950 953 'ioloop': ioloop, 951 954 'switch': switch, … … 971 974 token = base64.b64encode('%s:%s' % (args.ctluser, args.ctlpasswd)) 972 975 req.add_header('Authorization', 'Basic %s' % token) 973 method = '.'.join([ EtherWebSocketControlHandler.NAMESPACE, method])976 method = '.'.join([ControlServerHandler.NAMESPACE, method]) 974 977 data = {'jsonrpc': '2.0', 'method': method, 'id': id_} 975 978 if params is not None: … … 1017 1020 'insecure': getattr(args, 'insecure', None), 1018 1021 } 1019 if args.iftype == EtherWebSocketClient.IFTYPE:1022 if args.iftype == ClientHandler.IFTYPE: 1020 1023 if not args.target.startswith('ws://') and \ 1021 1024 not args.target.startswith('wss://'): … … 1184 1187 1185 1188 # --- ctl addport client 1186 parser_ctl_addport_client = iftype.add_parser( EtherWebSocketClient.IFTYPE,1189 parser_ctl_addport_client = iftype.add_parser(ClientHandler.IFTYPE, 1187 1190 help='WebSocket client') 1188 1191 parser_ctl_addport_client.add_argument('target',
Note: See TracChangeset
for help on using the changeset viewer.