. ' 'Configuration was %s') % servers)
self.servers = [
self.server_proxy_for(server, int(port))
for server, port in (s.rsplit(':', 1) for s in servers)
]
eventlet.spawn(self._consistency_watchdog,
cfg.CONF.RESTPROXY.consistency_interval)
LOG.debug(_("ServerPool: initialization done"))
**** CubicPower OpenStack Study ****
def get_capabilities(self):
# lookup on first try
try:
return self.capabilities
except AttributeError:
# each server should return a list of capabilities it supports
# e.g. ['floatingip']
capabilities = [set(server.get_capabilities())
for server in self.servers]
# Pool only supports what all of the servers support
self.capabilities = set.intersection(*capabilities)
return self.capabilities
**** CubicPower OpenStack Study ****
def server_proxy_for(self, server, port):
combined_cert = self._get_combined_cert_for_server(server, port)
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
self.timeout, self.base_uri, self.name, mypool=self,
combined_cert=combined_cert)
**** CubicPower OpenStack Study ****
def _get_combined_cert_for_server(self, server, port):
# The ssl library requires a combined file with all trusted certs
# so we make one containing the trusted CAs and the corresponding
# host cert for this server
combined_cert = None
if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
host_dir = os.path.join(base_ssl, 'host_certs')
ca_dir = os.path.join(base_ssl, 'ca_certs')
combined_dir = os.path.join(base_ssl, 'combined')
combined_cert = os.path.join(combined_dir, '%s.pem' % server)
if not os.path.exists(base_ssl):
raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
'Create it or disable ssl.') % base_ssl)
for automake in [combined_dir, ca_dir, host_dir]:
if not os.path.exists(automake):
os.makedirs(automake)
# get all CA certs
certs = self._get_ca_cert_paths(ca_dir)
# check for a host specific cert
hcert, exists = self._get_host_cert_path(host_dir, server)
if exists:
certs.append(hcert)
elif cfg.CONF.RESTPROXY.ssl_sticky:
self._fetch_and_store_cert(server, port, hcert)
certs.append(hcert)
if not certs:
raise cfg.Error(_('No certificates were found to verify '
'controller %s') % (server))
self._combine_certs_to_file(certs, combined_cert)
return combined_cert
**** CubicPower OpenStack Study ****
def _combine_certs_to_file(self, certs, cfile):
'''
Concatenates the contents of each certificate in a list of
certificate paths to one combined location for use with ssl
sockets.
'''
with open(cfile, 'w') as combined:
for c in certs:
with open(c, 'r') as cert_handle:
combined.write(cert_handle.read())
**** CubicPower OpenStack Study ****
def _get_host_cert_path(self, host_dir, server):
'''
returns full path and boolean indicating existence
'''
hcert = os.path.join(host_dir, '%s.pem' % server)
if os.path.exists(hcert):
return hcert, True
return hcert, False
**** CubicPower OpenStack Study ****
def _get_ca_cert_paths(self, ca_dir):
certs = [os.path.join(root, name)
for name in [
name for (root, dirs, files) in os.walk(ca_dir)
for name in files
]
if name.endswith('.pem')]
return certs
**** CubicPower OpenStack Study ****
def _fetch_and_store_cert(self, server, port, path):
'''
Grabs a certificate from a server and writes it to
a given path.
'''
try:
cert = ssl.get_server_certificate((server, port))
except Exception as e:
raise cfg.Error(_('Could not retrieve initial '
'certificate from controller %(server)s. '
'Error details: %(error)s') %
{'server': server, 'error': str(e)})
LOG.warning(_("Storing to certificate for host %(server)s "
"at %(path)s") % {'server': server,
'path': path})
self._file_put_contents(path, cert)
return cert
**** CubicPower OpenStack Study ****
def _file_put_contents(self, path, contents):
# Simple method to write to file.
# Created for easy Mocking
with open(path, 'w') as handle:
handle.write(contents)
**** CubicPower OpenStack Study ****
def server_failure(self, resp, ignore_codes=[]):
"""Define failure codes as required.
Note: We assume 301-303 is a failure, and try the next server in
the server pool.
"""
return (resp[0] in FAILURE_CODES and resp[0] not in ignore_codes)
**** CubicPower OpenStack Study ****
def action_success(self, resp):
"""Defining success codes as required.
Note: We assume any valid 2xx as being successful response.
"""
return resp[0] in SUCCESS_CODES
@utils.synchronized('bsn-rest-call')
**** CubicPower OpenStack Study ****
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False):
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers,
timeout,
reconnect=self.always_reconnect)
# If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT:
if not self.get_topo_function:
raise cfg.Error(_('Server requires synchronization, '
'but no topology function was defined.'))
data = self.get_topo_function(**self.get_topo_function_args)
active_server.rest_call('PUT', TOPOLOGY_PATH, data,
timeout=None)
# Store the first response as the error to be bubbled up to the
# user since it was a good server. Subsequent servers will most
# likely be cluster slaves and won't have a useful error for the
# user (e.g. 302 redirect to master)
if not first_response:
first_response = ret
if not self.server_failure(ret, ignore_codes):
active_server.failed = False
return ret
else:
LOG.error(_('ServerProxy: %(action)s failure for servers: '
'%(server)r Response: %(response)s'),
{'action': action,
'server': (active_server.server,
active_server.port),
'response': ret[3]})
LOG.error(_("ServerProxy: Error details: status=%(status)d, "
"reason=%(reason)r, ret=%(ret)s, data=%(data)r"),
{'status': ret[0], 'reason': ret[1], 'ret': ret[2],
'data': ret[3]})
active_server.failed = True
# All servers failed, reset server list and try again next time
LOG.error(_('ServerProxy: %(action)s failure for all servers: '
'%(server)r'),
{'action': action,
'server': tuple((s.server,
s.port) for s in self.servers)})
return first_response
**** CubicPower OpenStack Study ****
def rest_action(self, action, resource, data='', errstr='%s',
ignore_codes=[], headers={}, timeout=False):
"""
Wrapper for rest_call that verifies success and raises a
RemoteRestError on failure with a provided error string
By default, 404 errors on DELETE calls are ignored because
they already do not exist on the backend.
"""
if not ignore_codes and action == 'DELETE':
ignore_codes = [404]
resp = self.rest_call(action, resource, data, headers, ignore_codes,
timeout)
if self.server_failure(resp, ignore_codes):
LOG.error(errstr, resp[2])
raise RemoteRestError(reason=resp[2], status=resp[0])
if resp[0] in ignore_codes:
LOG.warning(_("NeutronRestProxyV2: Received and ignored error "
"code %(code)s on %(action)s action to resource "
"%(resource)s"),
{'code': resp[2], 'action': action,
'resource': resource})
return resp
**** CubicPower OpenStack Study ****
def rest_create_router(self, tenant_id, router):
resource = ROUTER_RESOURCE_PATH % tenant_id
data = {"router": router}
errstr = _("Unable to create remote router: %s")
self.rest_action('POST', resource, data, errstr)
**** CubicPower OpenStack Study ****
def rest_update_router(self, tenant_id, router, router_id):
resource = ROUTERS_PATH % (tenant_id, router_id)
data = {"router": router}
errstr = _("Unable to update remote router: %s")
self.rest_action('PUT', resource, data, errstr)
**** CubicPower OpenStack Study ****
def rest_delete_router(self, tenant_id, router_id):
resource = ROUTERS_PATH % (tenant_id, router_id)
errstr = _("Unable to delete remote router: %s")
self.rest_action('DELETE', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def rest_add_router_interface(self, tenant_id, router_id, intf_details):
resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
data = {"interface": intf_details}
errstr = _("Unable to add router interface: %s")
self.rest_action('POST', resource, data, errstr)
**** CubicPower OpenStack Study ****
def rest_remove_router_interface(self, tenant_id, router_id, interface_id):
resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
errstr = _("Unable to delete remote intf: %s")
self.rest_action('DELETE', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def rest_create_network(self, tenant_id, network):
resource = NET_RESOURCE_PATH % tenant_id
data = {"network": network}
errstr = _("Unable to create remote network: %s")
self.rest_action('POST', resource, data, errstr)
**** CubicPower OpenStack Study ****
def rest_update_network(self, tenant_id, net_id, network):
resource = NETWORKS_PATH % (tenant_id, net_id)
data = {"network": network}
errstr = _("Unable to update remote network: %s")
self.rest_action('PUT', resource, data, errstr)
**** CubicPower OpenStack Study ****
def rest_delete_network(self, tenant_id, net_id):
resource = NETWORKS_PATH % (tenant_id, net_id)
errstr = _("Unable to update remote network: %s")
self.rest_action('DELETE', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def rest_create_port(self, tenant_id, net_id, port):
resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
data = {"port": port}
device_id = port.get("device_id")
if not port["mac_address"] or not device_id:
# controller only cares about ports attached to devices
LOG.warning(_("No device MAC attached to port %s. "
"Skipping notification to controller."), port["id"])
return
data["attachment"] = {"id": device_id,
"mac": port["mac_address"]}
errstr = _("Unable to create remote port: %s")
self.rest_action('PUT', resource, data, errstr)
**** CubicPower OpenStack Study ****
def rest_delete_port(self, tenant_id, network_id, port_id):
resource = ATTACHMENT_PATH % (tenant_id, network_id, port_id)
errstr = _("Unable to delete remote port: %s")
self.rest_action('DELETE', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def rest_update_port(self, tenant_id, net_id, port):
# Controller has no update operation for the port endpoint
# the create PUT method will replace
self.rest_create_port(tenant_id, net_id, port)
**** CubicPower OpenStack Study ****
def rest_create_floatingip(self, tenant_id, floatingip):
resource = FLOATINGIPS_PATH % (tenant_id, floatingip['id'])
errstr = _("Unable to create floating IP: %s")
self.rest_action('PUT', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def rest_update_floatingip(self, tenant_id, floatingip, oldid):
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
errstr = _("Unable to update floating IP: %s")
self.rest_action('PUT', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def rest_delete_floatingip(self, tenant_id, oldid):
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
errstr = _("Unable to delete floating IP: %s")
self.rest_action('DELETE', resource, errstr=errstr)
**** CubicPower OpenStack Study ****
def _consistency_watchdog(self, polling_interval=60):
if 'consistency' not in self.get_capabilities():
LOG.warning(_("Backend server(s) do not support automated "
"consitency checks."))
return
while True:
# If consistency is supported, all we have to do is make any
# rest call and the consistency header will be added. If it
# doesn't match, the backend will return a synchronization error
# that will be handled by the rest_call.
eventlet.sleep(polling_interval)
self.rest_call('GET', HEALTH_PATH)
**** CubicPower OpenStack Study ****
class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
# If combined_cert is None, the connection will continue without
# any certificate validation.
combined_cert = None
**** CubicPower OpenStack Study ****
def connect(self):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
if self.combined_cert:
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=self.combined_cert)
else:
self.sock = ssl.wrap_socket(sock, self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_NONE)