架构 模块示意图 这里参考的是lbaasv2的driver,neutron_lbaas.conf中的service_provider即为lbaasv2的driver:service_provider=LOADBALANCERV2:Haproxy:neutron_lbaas.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default
neutron.conf中的service_plugins表示了lbaasv2的plugin:lbaasv2 = neutron_lbaas.services.loadbalancer.plugin:LoadBalancerPluginv2
plugin neutron_lbaas.services.loadbalancer.plugin.py
1 2 3 4 class LoadBalancerPluginv2 (loadbalancerv2.LoadBalancerPluginBaseV2 ): def __init__ (self ): """Initialization for the loadbalancer service plugin.""" self.db = ldbv2.LoadBalancerPluginDbv2()
driver neutron_lbaas.drivers.haproxy.plugin_driver.py
1 2 class HaproxyOnHostPluginDriver (agent_driver_base.AgentDriverBase ): device_driver = namespace_driver.DRIVER_NAME
agent rpc(plugin向agent发送) neutron_lbaas.drivers.common.agent_driver_base.py
class LoadBalancerAgentApi
def __init__
plugin rpc(agent向plugin发送) neutron_lbaas.agent.agent_api.py
class LbaasAgentApi
def __init__
agent 侧的回调 neutron_lbaas.agent.agent_manager.py
class LbaasAgentManager
plugin 侧的回调 neutron_lbaas.drivers.common.agent_driver_base.py
class AgentDriverBase
def _set_callbacks_on_plugin
neutron_lbaas.drivers.common.agent_callbacks.py
class LoadBalancerCallbacks
agent入口函数 neutron_lbaas.agent.agent.py
def main
db处理 neutron_lbaas.db.loadbalancer.loadbalancer_dbv2.py
class LoadBalancerPluginDbv2
流程示例,创建一个Pool plugin部分 首先进入一个同步流程,即创建pool的数据库信息。
plugin入口 neutron_lbaas.services.loadbalancer.plugin.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class LoadBalancerPluginv2 (loadbalancerv2.LoadBalancerPluginBaseV2 ): def create_pool (self, context, pool ): try : db_pool = self.db.create_pool_and_add_to_listener(context, pool, listener_id) except Exception as exc: self.db.update_loadbalancer_provisioning_status( context, db_listener.loadbalancer.id ) raise exc driver = self._get_driver_for_loadbalancer( context, db_pool.listener.loadbalancer_id) self._call_driver_operation(context, driver.pool.create, db_pool) return self.db.get_pool(context, db_pool.id ).to_api_dict()
plugin的api及rpc处理 neutron_lbaas.drivers.common.agent_driver_base.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class AgentDriverBase (driver_base.LoadBalancerBaseDriver ): device_driver = None def __init__ (self, plugin ): super (AgentDriverBase, self).__init__(plugin) self.pool = PoolManager(self) self.agent_rpc = LoadBalancerAgentApi(lb_const.LOADBALANCER_AGENTV2) self._set_callbacks_on_plugin() self.plugin.db.agent_notifiers.update( {lb_const.AGENT_TYPE_LOADBALANCERV2: self.agent_rpc}) lb_sched_driver = provconf.get_provider_driver_class( cfg.CONF.loadbalancer_scheduler_driver, LB_SCHEDULERS) self.loadbalancer_scheduler = importutils.import_object( lb_sched_driver) class PoolManager (driver_base.BasePoolManager ): def create (self, context, pool ): super (PoolManager, self).delete(context, pool) agent = self.driver.get_loadbalancer_agent( context, pool.listener.loadbalancer.id ) self.driver.agent_rpc.create_pool(context, pool, agent['host' ]) class LoadBalancerAgentApi (object ): """Plugin side of plugin to agent RPC API.""" def __init__ (self, topic ): target = messaging.Target(topic=topic, version='1.0' ) self.client = n_rpc.get_client(target, serializer=DataModelSerializer()) def create_pool (self, context, pool, host ): cctxt = self.client.prepare(server=host) cctxt.cast(context, 'create_pool' , pool=pool)
agent(driver)部分 查看入口函数,可见LbaasAgentManager为api的manager
agent的入口 neutron_lbaas.agent.agent.py - main
1 2 3 4 5 6 7 8 9 10 def main (): mgr = manager.LbaasAgentManager(cfg.CONF) svc = LbaasAgentService( host=cfg.CONF.host, topic=constants.LOADBALANCER_AGENTV2, manager=mgr ) service.launch(cfg.CONF, svc).wait()
agent处理plugin下发的消息 neutron_lbaas.agent.agent_manager.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class LbaasAgentManager (periodic_task.PeriodicTasks ): def create_pool (self, context, pool ): pool = data_models.Pool.from_dict(pool) driver = self._get_driver(pool.listener.loadbalancer.id ) try : driver.pool.create(pool) except Exception: self._handle_failed_driver_call('create' , pool, driver.get_name()) else : self._update_statuses(pool)
agent将任务下发给driver neutron_lbaas.drivers.haproxy.namespace_driver.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 NS_PREFIX = 'qlbaas-' def get_ns_name (namespace_id ): return NS_PREFIX + namespace_id class HaproxyNSDriver (agent_device_driver.AgentDeviceDriver ): def __init__ (self, conf, plugin_rpc ): super (HaproxyNSDriver, self).__init__(conf, plugin_rpc) self._pool = PoolManager(self) self._loadbalancer = LoadBalancerManager(self) @property def loadbalancer (self ): return self._loadbalancer @property def pool (self ): return self._pool
neutron_lbaas.drivers.haproxy.namespace_driver.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class PoolManager (agent_device_driver.BasePoolManager ): def create (self, pool ): self.driver.loadbalancer.refresh(pool.listener.loadbalancer) class LoadBalancerManager (agent_device_driver.BaseLoadBalancerManager ): def refresh (self, loadbalancer ): loadbalancer_dict = self.driver.plugin_rpc.get_loadbalancer( loadbalancer.id ) loadbalancer = data_models.LoadBalancer.from_dict(loadbalancer_dict) if (not self.driver.deploy_instance(loadbalancer) and self.driver.exists(loadbalancer.id )): self.driver.undeploy_instance(loadbalancer.id )
driver通过rpc访问plugin侧的db neutron_lbaas.agent.agent_api.py
1 2 3 4 5 6 7 8 class LbaasAgentApi (object ): def get_loadbalancer (self, loadbalancer_id ): cctxt = self.client.prepare() return cctxt.call(self.context, 'get_loadbalancer' , loadbalancer_id=loadbalancer_id)
neutron_lbaas.drivers.common.agent_callbacks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class LoadBalancerCallbacks (object ): def get_loadbalancer (self, context, loadbalancer_id=None ): lb_model = self.plugin.db.get_loadbalancer(context, loadbalancer_id) if lb_model.vip_port and lb_model.vip_port.fixed_ips: for fixed_ip in lb_model.vip_port.fixed_ips: subnet_dict = self.plugin.db._core_plugin.get_subnet( context, fixed_ip.subnet_id ) setattr (fixed_ip, 'subnet' , data_models.Subnet.from_dict( subnet_dict)) if lb_model.provider: device_driver = self.plugin.drivers[ lb_model.provider.provider_name].device_driver setattr (lb_model.provider, 'device_driver' , device_driver) lb_dict = lb_model.to_dict(stats=False ) return lb_dict
neutron_lbaas.db.loadbalancer.loadbalancer_dbv2.py
1 2 3 4 5 6 7 class LoadBalancerPluginDbv2 (base_db.CommonDbMixin, agent_scheduler.LbaasAgentSchedulerDbMixin ): def get_loadbalancer (self, context, id ): lb_db = self._get_resource(context, models.LoadBalancer, id ) return data_models.LoadBalancer.from_sqlalchemy_model(lb_db)
driver开始执行实际操作 neutron_lbaas.drivers.haproxy.namespace_driver.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 class HaproxyNSDriver (agent_device_driver.AgentDeviceDriver ): @n_utils.synchronized('haproxy-driver' ) def deploy_instance (self, loadbalancer ): if not self.deployable(loadbalancer): LOG.info(_LI("Loadbalancer %s is not deployable." ) % loadbalancer.id ) return False if self.exists(loadbalancer.id ): self.update(loadbalancer) else : self.create(loadbalancer) return True def deployable (self, loadbalancer ): if not loadbalancer: return False acceptable_listeners = [ listener for listener in loadbalancer.listeners if (listener.provisioning_status != constants.PENDING_DELETE and listener.admin_state_up)] return (bool (acceptable_listeners) and loadbalancer.admin_state_up and loadbalancer.provisioning_status != constants.PENDING_DELETE) def create (self, loadbalancer ): namespace = get_ns_name(loadbalancer.id ) self._plug(namespace, loadbalancer.vip_port, loadbalancer.vip_address) self._spawn(loadbalancer) def _plug (self, namespace, port, vip_address, reuse_existing=True ): self.plugin_rpc.plug_vip_port(port.id ) interface_name = self.vif_driver.get_device_name(port) if ip_lib.device_exists(interface_name, namespace=namespace): if not reuse_existing: raise exceptions.PreexistingDeviceFailure( dev_name=interface_name ) else : self.vif_driver.plug( port.network_id, port.id , interface_name, port.mac_address, namespace=namespace ) self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace) if netaddr.IPAddress(vip_address).version == 6 : device = ip_lib.IPDevice(interface_name, namespace=namespace) device.addr.wait_until_address_ready(vip_address) gw_ip = port.fixed_ips[0 ].subnet.gateway_ip if not gw_ip: host_routes = port.fixed_ips[0 ].subnet.host_routes for host_route in host_routes: if host_route.destination == "0.0.0.0/0" : gw_ip = host_route.nexthop break else : cmd = ['route' , 'add' , 'default' , 'gw' , gw_ip] ip_wrapper = ip_lib.IPWrapper(namespace=namespace) ip_wrapper.netns.execute(cmd, check_exit_code=False ) gratuitous_arp = self.conf.haproxy.send_gratuitous_arp if gratuitous_arp > 0 : for ip in port.fixed_ips: cmd_arping = ['arping' , '-U' , '-I' , interface_name, '-c' , gratuitous_arp, ip.ip_address] ip_wrapper.netns.execute(cmd_arping, check_exit_code=False ) def _spawn (self, loadbalancer, extra_cmd_args=( ) ): namespace = get_ns_name(loadbalancer.id ) conf_path = self._get_state_file_path(loadbalancer.id , 'haproxy.conf' ) pid_path = self._get_state_file_path(loadbalancer.id , 'haproxy.pid' ) sock_path = self._get_state_file_path(loadbalancer.id , 'haproxy_stats.sock' ) user_group = self.conf.haproxy.user_group haproxy_base_dir = self._get_state_file_path(loadbalancer.id , '' ) jinja_cfg.save_config(conf_path, loadbalancer, sock_path, user_group, haproxy_base_dir) cmd = ['haproxy' , '-f' , conf_path, '-p' , pid_path] cmd.extend(extra_cmd_args) ns = ip_lib.IPWrapper(namespace=namespace) ns.netns.execute(cmd) self.deployed_loadbalancers[loadbalancer.id ] = loadbalancer
操作步骤(未实践)
如使用rdo安装,需要设置neutron的lbaas地址,--neutron-lbaas-hosts
迁移lbaas的数据库至neutron数据库1 $ neutron-db-manage --service lbaas upgrade head
创建一个用于提供vip的subnet,此处命名private-subnet
创建loadbalancer,此处命名test-lb1 $ neutron lbaas-loadbalancer-create --name test-lb private-subnet
创建listener,此处命名test-lb-http1 2 3 4 5 $ neutron lbaas-listener-create \ --name test-lb-http \ --loadbalancer test-lb \ --protocol HTTP \ --protocol-port 80
创建pool,此处命名test-lb-pool-http1 2 3 4 5 $ neutron lbaas-pool-create \ --name test-lb-pool-http \ --lb-algorithm ROUND_ROBIN \ --listener test-lb-http \ --protocol HTTP
创建member1 2 3 4 5 6 7 8 9 10 $ neutron lbaas-member-create \ --subnet private-subnet \ --address 192.168.1.16 \ --protocol-port 80 \ test-lb-pool-http $ neutron lbaas-member-create \ --subnet private-subnet \ --address 192.168.1.17 \ --protocol-port 80 \ test-lb-pool-http
创建healthmonitor1 2 3 4 5 6 $ neutron lbaas-healthmonitor-create \ --delay 5 \ --max-retries 2 \ --timeout 10 \ --type HTTP \ --pool test-lb-pool-http