 6e592fc922
			
		
	
	
		6e592fc922
		
	
	
	
	
		
			
			Currently 083 waits for the nbd-fault-injector.py server to start up by looping until netstat shows the TCP listen socket. The startup protocol can be simplified by passing a 0 port number to nbd-fault-injector.py. The kernel will allocate a port in bind(2) and the final port number can be printed by nbd-fault-injector.py. This should make it slightly nicer and less TCP-specific to wait for server startup. This patch changes nbd-fault-injector.py, the next one will rewrite server startup in 083. Reviewed-by: Eric Blake <eblake@redhat.com> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Message-Id: <20170829122745.14309-3-stefanha@redhat.com> Signed-off-by: Eric Blake <eblake@redhat.com>
		
			
				
	
	
		
			269 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			269 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| # NBD server - fault injection utility
 | |
| #
 | |
| # Configuration file syntax:
 | |
| #   [inject-error "disconnect-neg1"]
 | |
| #   event=neg1
 | |
| #   io=readwrite
 | |
| #   when=before
 | |
| #
 | |
| # Note that Python's ConfigParser squashes together all sections with the same
 | |
| # name, so give each [inject-error] a unique name.
 | |
| #
 | |
| # inject-error options:
 | |
| #   event - name of the trigger event
 | |
| #           "neg1" - first part of negotiation struct
 | |
| #           "export" - export struct
 | |
| #           "neg2" - second part of negotiation struct
 | |
| #           "request" - NBD request struct
 | |
| #           "reply" - NBD reply struct
 | |
| #           "data" - request/reply data
 | |
| #   io    - I/O direction that triggers this rule:
 | |
| #           "read", "write", or "readwrite"
 | |
| #           default: readwrite
 | |
| #   when  - after how many bytes to inject the fault
 | |
| #           -1 - inject error after I/O
 | |
| #           0 - inject error before I/O
 | |
| #           integer - inject error after integer bytes
 | |
| #           "before" - alias for 0
 | |
| #           "after" - alias for -1
 | |
| #           default: before
 | |
| #
 | |
| # Currently the only error injection action is to terminate the server process.
 | |
| # This resets the TCP connection and thus forces the client to handle
 | |
| # unexpected connection termination.
 | |
| #
 | |
| # Other error injection actions could be added in the future.
 | |
| #
 | |
| # Copyright Red Hat, Inc. 2014
 | |
| #
 | |
| # Authors:
 | |
| #   Stefan Hajnoczi <stefanha@redhat.com>
 | |
| #
 | |
| # This work is licensed under the terms of the GNU GPL, version 2 or later.
 | |
| # See the COPYING file in the top-level directory.
 | |
| 
 | |
| import sys
 | |
| import socket
 | |
| import struct
 | |
| import collections
 | |
| import ConfigParser
 | |
| 
 | |
| FAKE_DISK_SIZE = 8 * 1024 * 1024 * 1024 # 8 GB
 | |
| 
 | |
| # Protocol constants
 | |
| NBD_CMD_READ = 0
 | |
| NBD_CMD_WRITE = 1
 | |
| NBD_CMD_DISC = 2
 | |
| NBD_REQUEST_MAGIC = 0x25609513
 | |
| NBD_REPLY_MAGIC = 0x67446698
 | |
| NBD_PASSWD = 0x4e42444d41474943
 | |
| NBD_OPTS_MAGIC = 0x49484156454F5054
 | |
| NBD_CLIENT_MAGIC = 0x0000420281861253
 | |
| NBD_OPT_EXPORT_NAME = 1 << 0
 | |
| 
 | |
| # Protocol structs
 | |
| neg_classic_struct = struct.Struct('>QQQI124x')
 | |
| neg1_struct = struct.Struct('>QQH')
 | |
| export_tuple = collections.namedtuple('Export', 'reserved magic opt len')
 | |
| export_struct = struct.Struct('>IQII')
 | |
| neg2_struct = struct.Struct('>QH124x')
 | |
| request_tuple = collections.namedtuple('Request', 'magic type handle from_ len')
 | |
| request_struct = struct.Struct('>IIQQI')
 | |
| reply_struct = struct.Struct('>IIQ')
 | |
| 
 | |
| def err(msg):
 | |
|     sys.stderr.write(msg + '\n')
 | |
|     sys.exit(1)
 | |
| 
 | |
| def recvall(sock, bufsize):
 | |
|     received = 0
 | |
|     chunks = []
 | |
|     while received < bufsize:
 | |
|         chunk = sock.recv(bufsize - received)
 | |
|         if len(chunk) == 0:
 | |
|             raise Exception('unexpected disconnect')
 | |
|         chunks.append(chunk)
 | |
|         received += len(chunk)
 | |
|     return ''.join(chunks)
 | |
| 
 | |
| class Rule(object):
 | |
|     def __init__(self, name, event, io, when):
 | |
|         self.name = name
 | |
|         self.event = event
 | |
|         self.io = io
 | |
|         self.when = when
 | |
| 
 | |
|     def match(self, event, io):
 | |
|         if event != self.event:
 | |
|             return False
 | |
|         if io != self.io and self.io != 'readwrite':
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
| class FaultInjectionSocket(object):
 | |
|     def __init__(self, sock, rules):
 | |
|         self.sock = sock
 | |
|         self.rules = rules
 | |
| 
 | |
|     def check(self, event, io, bufsize=None):
 | |
|         for rule in self.rules:
 | |
|             if rule.match(event, io):
 | |
|                 if rule.when == 0 or bufsize is None:
 | |
|                     print 'Closing connection on rule match %s' % rule.name
 | |
|                     sys.exit(0)
 | |
|                 if rule.when != -1:
 | |
|                     return rule.when
 | |
|         return bufsize
 | |
| 
 | |
|     def send(self, buf, event):
 | |
|         bufsize = self.check(event, 'write', bufsize=len(buf))
 | |
|         self.sock.sendall(buf[:bufsize])
 | |
|         self.check(event, 'write')
 | |
| 
 | |
|     def recv(self, bufsize, event):
 | |
|         bufsize = self.check(event, 'read', bufsize=bufsize)
 | |
|         data = recvall(self.sock, bufsize)
 | |
|         self.check(event, 'read')
 | |
|         return data
 | |
| 
 | |
|     def close(self):
 | |
|         self.sock.close()
 | |
| 
 | |
| def negotiate_classic(conn):
 | |
|     buf = neg_classic_struct.pack(NBD_PASSWD, NBD_CLIENT_MAGIC,
 | |
|                                   FAKE_DISK_SIZE, 0)
 | |
|     conn.send(buf, event='neg-classic')
 | |
| 
 | |
| def negotiate_export(conn):
 | |
|     # Send negotiation part 1
 | |
|     buf = neg1_struct.pack(NBD_PASSWD, NBD_OPTS_MAGIC, 0)
 | |
|     conn.send(buf, event='neg1')
 | |
| 
 | |
|     # Receive export option
 | |
|     buf = conn.recv(export_struct.size, event='export')
 | |
|     export = export_tuple._make(export_struct.unpack(buf))
 | |
|     assert export.magic == NBD_OPTS_MAGIC
 | |
|     assert export.opt == NBD_OPT_EXPORT_NAME
 | |
|     name = conn.recv(export.len, event='export-name')
 | |
| 
 | |
|     # Send negotiation part 2
 | |
|     buf = neg2_struct.pack(FAKE_DISK_SIZE, 0)
 | |
|     conn.send(buf, event='neg2')
 | |
| 
 | |
| def negotiate(conn, use_export):
 | |
|     '''Negotiate export with client'''
 | |
|     if use_export:
 | |
|         negotiate_export(conn)
 | |
|     else:
 | |
|         negotiate_classic(conn)
 | |
| 
 | |
| def read_request(conn):
 | |
|     '''Parse NBD request from client'''
 | |
|     buf = conn.recv(request_struct.size, event='request')
 | |
|     req = request_tuple._make(request_struct.unpack(buf))
 | |
|     assert req.magic == NBD_REQUEST_MAGIC
 | |
|     return req
 | |
| 
 | |
| def write_reply(conn, error, handle):
 | |
|     buf = reply_struct.pack(NBD_REPLY_MAGIC, error, handle)
 | |
|     conn.send(buf, event='reply')
 | |
| 
 | |
| def handle_connection(conn, use_export):
 | |
|     negotiate(conn, use_export)
 | |
|     while True:
 | |
|         req = read_request(conn)
 | |
|         if req.type == NBD_CMD_READ:
 | |
|             write_reply(conn, 0, req.handle)
 | |
|             conn.send('\0' * req.len, event='data')
 | |
|         elif req.type == NBD_CMD_WRITE:
 | |
|             _ = conn.recv(req.len, event='data')
 | |
|             write_reply(conn, 0, req.handle)
 | |
|         elif req.type == NBD_CMD_DISC:
 | |
|             break
 | |
|         else:
 | |
|             print 'unrecognized command type %#02x' % req.type
 | |
|             break
 | |
|     conn.close()
 | |
| 
 | |
| def run_server(sock, rules, use_export):
 | |
|     while True:
 | |
|         conn, _ = sock.accept()
 | |
|         handle_connection(FaultInjectionSocket(conn, rules), use_export)
 | |
| 
 | |
| def parse_inject_error(name, options):
 | |
|     if 'event' not in options:
 | |
|         err('missing \"event\" option in %s' % name)
 | |
|     event = options['event']
 | |
|     if event not in ('neg-classic', 'neg1', 'export', 'neg2', 'request', 'reply', 'data'):
 | |
|         err('invalid \"event\" option value \"%s\" in %s' % (event, name))
 | |
|     io = options.get('io', 'readwrite')
 | |
|     if io not in ('read', 'write', 'readwrite'):
 | |
|         err('invalid \"io\" option value \"%s\" in %s' % (io, name))
 | |
|     when = options.get('when', 'before')
 | |
|     try:
 | |
|         when = int(when)
 | |
|     except ValueError:
 | |
|         if when == 'before':
 | |
|             when = 0
 | |
|         elif when == 'after':
 | |
|             when = -1
 | |
|         else:
 | |
|             err('invalid \"when\" option value \"%s\" in %s' % (when, name))
 | |
|     return Rule(name, event, io, when)
 | |
| 
 | |
| def parse_config(config):
 | |
|     rules = []
 | |
|     for name in config.sections():
 | |
|         if name.startswith('inject-error'):
 | |
|             options = dict(config.items(name))
 | |
|             rules.append(parse_inject_error(name, options))
 | |
|         else:
 | |
|             err('invalid config section name: %s' % name)
 | |
|     return rules
 | |
| 
 | |
| def load_rules(filename):
 | |
|     config = ConfigParser.RawConfigParser()
 | |
|     with open(filename, 'rt') as f:
 | |
|         config.readfp(f, filename)
 | |
|     return parse_config(config)
 | |
| 
 | |
| def open_socket(path):
 | |
|     '''Open a TCP or UNIX domain listen socket'''
 | |
|     if ':' in path:
 | |
|         host, port = path.split(':', 1)
 | |
|         sock = socket.socket()
 | |
|         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | |
|         sock.bind((host, int(port)))
 | |
| 
 | |
|         # If given port was 0 the final port number is now available
 | |
|         path = '%s:%d' % sock.getsockname()
 | |
|     else:
 | |
|         sock = socket.socket(socket.AF_UNIX)
 | |
|         sock.bind(path)
 | |
|     sock.listen(0)
 | |
|     print 'Listening on %s' % path
 | |
|     sys.stdout.flush() # another process may be waiting, show message now
 | |
|     return sock
 | |
| 
 | |
| def usage(args):
 | |
|     sys.stderr.write('usage: %s [--classic-negotiation] <tcp-port>|<unix-path> <config-file>\n' % args[0])
 | |
|     sys.stderr.write('Run an fault injector NBD server with rules defined in a config file.\n')
 | |
|     sys.exit(1)
 | |
| 
 | |
| def main(args):
 | |
|     if len(args) != 3 and len(args) != 4:
 | |
|         usage(args)
 | |
|     use_export = True
 | |
|     if args[1] == '--classic-negotiation':
 | |
|         use_export = False
 | |
|     elif len(args) == 4:
 | |
|         usage(args)
 | |
|     sock = open_socket(args[1 if use_export else 2])
 | |
|     rules = load_rules(args[2 if use_export else 3])
 | |
|     run_server(sock, rules, use_export)
 | |
|     return 0
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     sys.exit(main(sys.argv))
 |