File: //lib64/python2.7/site-packages/pymongo/mongo_client.py
# Copyright 2009-2012 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Tools for connecting to MongoDB.
.. seealso:: Module :mod:`~pymongo.master_slave_connection` for
connecting to master-slave clusters, and
:doc:`/examples/high_availability` for an example of how to connect
to a replica set, or specify a list of mongos instances for automatic
failover.
To get a :class:`~pymongo.database.Database` instance from a
:class:`MongoClient` use either dictionary-style or attribute-style
access:
.. doctest::
>>> from pymongo import MongoClient
>>> c = MongoClient()
>>> c.test_database
Database(MongoClient('localhost', 27017), u'test_database')
>>> c['test-database']
Database(MongoClient('localhost', 27017), u'test-database')
"""
import datetime
import random
import socket
import struct
import time
import warnings
from bson.py3compat import b
from pymongo import (auth,
common,
database,
helpers,
message,
pool,
uri_parser)
from pymongo.common import HAS_SSL
from pymongo.cursor_manager import CursorManager
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
DuplicateKeyError,
InvalidDocument,
InvalidURI,
OperationFailure)
EMPTY = b("")
def _partition_node(node):
"""Split a host:port string returned from mongod/s into
a (host, int(port)) pair needed for socket.connect().
"""
host = node
port = 27017
idx = node.rfind(':')
if idx != -1:
host, port = node[:idx], int(node[idx + 1:])
if host.startswith('['):
host = host[1:-1]
return host, port
class MongoClient(common.BaseObject):
"""Connection to MongoDB.
"""
HOST = "localhost"
PORT = 27017
__max_bson_size = 4 * 1024 * 1024
def __init__(self, host=None, port=None, max_pool_size=10,
document_class=dict, tz_aware=False, _connect=True, **kwargs):
"""Create a new connection to a single MongoDB instance at *host:port*.
The resultant client object has connection-pooling built
in. It also performs auto-reconnection when necessary. If an
operation fails because of a connection error,
:class:`~pymongo.errors.ConnectionFailure` is raised. If
auto-reconnection will be performed,
:class:`~pymongo.errors.AutoReconnect` will be
raised. Application code should handle this exception
(recognizing that the operation failed) and then continue to
execute.
Raises :class:`TypeError` if port is not an instance of
``int``. Raises :class:`~pymongo.errors.ConnectionFailure` if
the connection cannot be made.
The `host` parameter can be a full `mongodb URI
<http://dochub.mongodb.org/core/connections>`_, in addition to
a simple hostname. It can also be a list of hostnames or
URIs. Any port specified in the host string(s) will override
the `port` parameter. If multiple mongodb URIs containing
database or auth information are passed, the last database,
username, and password present will be used. For username and
passwords reserved characters like ':', '/', '+' and '@' must be
escaped following RFC 2396.
:Parameters:
- `host` (optional): hostname or IP address of the
instance to connect to, or a mongodb URI, or a list of
hostnames / mongodb URIs. If `host` is an IPv6 literal
it must be enclosed in '[' and ']' characters following
the RFC2732 URL syntax (e.g. '[::1]' for localhost)
- `port` (optional): port number on which to connect
- `max_pool_size` (optional): The maximum number of idle connections
to keep open in the pool for future use
- `document_class` (optional): default class to use for
documents returned from queries on this client
- `tz_aware` (optional): if ``True``,
:class:`~datetime.datetime` instances returned as values
in a document by this :class:`MongoClient` will be timezone
aware (otherwise they will be naive)
| **Other optional parameters can be passed as keyword arguments:**
- `socketTimeoutMS`: (integer) How long (in milliseconds) a send or
receive on a socket can take before timing out.
- `connectTimeoutMS`: (integer) How long (in milliseconds) a
connection can take to be opened before timing out.
- `auto_start_request`: If ``True``, each thread that accesses
this :class:`MongoClient` has a socket allocated to it for the
thread's lifetime. This ensures consistent reads, even if you
read after an unacknowledged write. Defaults to ``False``
- `use_greenlets`: If ``True``, :meth:`start_request()` will ensure
that the current greenlet uses the same socket for all
operations until :meth:`end_request()`
| **Write Concern options:**
- `w`: (integer or string) If this is a replica set, write operations
will block until they have been replicated to the specified number
or tagged set of servers. `w=<int>` always includes the replica set
primary (e.g. w=3 means write to the primary and wait until
replicated to **two** secondaries). Passing w=0 **disables write
acknowledgement** and all other write concern options.
- `wtimeout`: (integer) Used in conjunction with `w`. Specify a value
in milliseconds to control how long to wait for write propagation
to complete. If replication does not complete in the given
timeframe, a timeout exception is raised.
- `j`: If ``True`` block until write operations have been committed
to the journal. Ignored if the server is running without journaling.
- `fsync`: If ``True`` force the database to fsync all files before
returning. When used with `j` the server awaits the next group
commit before returning.
| **Replica set keyword arguments for connecting with a replica set
- either directly or via a mongos:**
| (ignored by standalone mongod instances)
- `replicaSet`: (string) The name of the replica set to connect to.
The driver will verify that the replica set it connects to matches
this name. Implies that the hosts specified are a seed list and the
driver should attempt to find all members of the set. *Ignored by
mongos*.
- `read_preference`: The read preference for this client. If
connecting to a secondary then a read preference mode *other* than
PRIMARY is required - otherwise all queries will throw
:class:`~pymongo.errors.AutoReconnect` "not master".
See :class:`~pymongo.read_preferences.ReadPreference` for all
available read preference options.
- `tag_sets`: Ignored unless connecting to a replica set via mongos.
Specify a priority-order for tag sets, provide a list of
tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
set, ``{}``, means "read from any member that matches the mode,
ignoring tags.
| **SSL configuration:**
- `ssl`: If ``True``, create the connection to the server using SSL.
- `ssl_keyfile`: The private keyfile used to identify the local
connection against mongod. If included with the ``certfile` then
only the ``ssl_certfile`` is needed. Implies ``ssl=True``.
- `ssl_certfile`: The certificate file used to identify the local
connection against mongod. Implies ``ssl=True``.
- `ssl_cert_reqs`: Specifies whether a certificate is required from
the other side of the connection, and whether it will be validated
if provided. It must be one of the three values ``ssl.CERT_NONE``
(certificates ignored), ``ssl.CERT_OPTIONAL``
(not required, but validated if provided), or ``ssl.CERT_REQUIRED``
(required and validated). If the value of this parameter is not
``ssl.CERT_NONE``, then the ``ssl_ca_certs`` parameter must point
to a file of CA certificates. Implies ``ssl=True``.
- `ssl_ca_certs`: The ca_certs file contains a set of concatenated
"certification authority" certificates, which are used to validate
certificates passed from the other end of the connection.
Implies ``ssl=True``.
.. seealso:: :meth:`end_request`
.. mongodoc:: connections
.. versionchanged:: 2.5
Added additional ssl options
.. versionadded:: 2.4
"""
if host is None:
host = self.HOST
if isinstance(host, basestring):
host = [host]
if port is None:
port = self.PORT
if not isinstance(port, int):
raise TypeError("port must be an instance of int")
seeds = set()
username = None
password = None
db_name = None
opts = {}
for entity in host:
if "://" in entity:
if entity.startswith("mongodb://"):
res = uri_parser.parse_uri(entity, port)
seeds.update(res["nodelist"])
username = res["username"] or username
password = res["password"] or password
db_name = res["database"] or db_name
opts = res["options"]
else:
idx = entity.find("://")
raise InvalidURI("Invalid URI scheme: "
"%s" % (entity[:idx],))
else:
seeds.update(uri_parser.split_hosts(entity, port))
if not seeds:
raise ConfigurationError("need to specify at least one host")
self.__nodes = seeds
self.__host = None
self.__port = None
self.__is_primary = False
self.__is_mongos = False
# _pool_class option is for deep customization of PyMongo, e.g. Motor.
# SHOULD NOT BE USED BY DEVELOPERS EXTERNAL TO 10GEN.
pool_class = kwargs.pop('_pool_class', pool.Pool)
options = {}
for option, value in kwargs.iteritems():
option, value = common.validate(option, value)
options[option] = value
options.update(opts)
self.__max_pool_size = common.validate_positive_integer(
'max_pool_size', max_pool_size)
self.__cursor_manager = CursorManager(self)
self.__repl = options.get('replicaset')
if len(seeds) == 1 and not self.__repl:
self.__direct = True
else:
self.__direct = False
self.__nodes = set()
self.__net_timeout = options.get('sockettimeoutms')
self.__conn_timeout = options.get('connecttimeoutms')
self.__use_ssl = options.get('ssl', None)
self.__ssl_keyfile = options.get('ssl_keyfile', None)
self.__ssl_certfile = options.get('ssl_certfile', None)
self.__ssl_cert_reqs = options.get('ssl_cert_reqs', None)
self.__ssl_ca_certs = options.get('ssl_ca_certs', None)
ssl_kwarg_keys = [k for k in kwargs.keys() if k.startswith('ssl_')]
if self.__use_ssl == False and ssl_kwarg_keys:
raise ConfigurationError("ssl has not been enabled but the "
"following ssl parameters have been set: "
"%s. Please set `ssl=True` or remove."
% ', '.join(ssl_kwarg_keys))
if self.__ssl_cert_reqs and not self.__ssl_ca_certs:
raise ConfigurationError("If `ssl_cert_reqs` is not "
"`ssl.CERT_NONE` then you must "
"include `ssl_ca_certs` to be able "
"to validate the server.")
if ssl_kwarg_keys and self.__use_ssl is None:
# ssl options imply ssl = True
self.__use_ssl = True
if self.__use_ssl and not HAS_SSL:
raise ConfigurationError("The ssl module is not available. If you "
"are using a python version previous to "
"2.6 you must install the ssl package "
"from PyPI.")
self.__use_greenlets = options.get('use_greenlets', False)
self.__pool = pool_class(
None,
self.__max_pool_size,
self.__net_timeout,
self.__conn_timeout,
self.__use_ssl,
use_greenlets=self.__use_greenlets,
ssl_keyfile=self.__ssl_keyfile,
ssl_certfile=self.__ssl_certfile,
ssl_cert_reqs=self.__ssl_cert_reqs,
ssl_ca_certs=self.__ssl_ca_certs)
self.__document_class = document_class
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
self.__auto_start_request = options.get('auto_start_request', False)
# cache of existing indexes used by ensure_index ops
self.__index_cache = {}
self.__auth_credentials = {}
super(MongoClient, self).__init__(**options)
if self.slave_okay:
warnings.warn("slave_okay is deprecated. Please "
"use read_preference instead.", DeprecationWarning,
stacklevel=2)
if _connect:
try:
self.__find_node(seeds)
except AutoReconnect, e:
# ConnectionFailure makes more sense here than AutoReconnect
raise ConnectionFailure(str(e))
db_name = options.get('authsource', db_name)
if db_name and username is None:
warnings.warn("database name or authSource in URI is being "
"ignored. If you wish to authenticate to %s, you "
"must provide a username and password." % (db_name,))
if username:
mechanism = options.get('authmechanism', 'MONGODB-CR')
if mechanism == 'GSSAPI':
source = '$external'
else:
source = db_name or 'admin'
credentials = (source, unicode(username),
unicode(password), mechanism)
try:
self._cache_credentials(source, credentials, _connect)
except OperationFailure, exc:
raise ConfigurationError(str(exc))
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
"""
cache = self.__index_cache
now = datetime.datetime.utcnow()
return (dbname in cache and
coll in cache[dbname] and
index in cache[dbname][coll] and
now < cache[dbname][coll][index])
def _cache_index(self, database, collection, index, cache_for):
"""Add an index to the index cache for ensure_index operations.
"""
now = datetime.datetime.utcnow()
expire = datetime.timedelta(seconds=cache_for) + now
if database not in self.__index_cache:
self.__index_cache[database] = {}
self.__index_cache[database][collection] = {}
self.__index_cache[database][collection][index] = expire
elif collection not in self.__index_cache[database]:
self.__index_cache[database][collection] = {}
self.__index_cache[database][collection][index] = expire
else:
self.__index_cache[database][collection][index] = expire
def _purge_index(self, database_name,
collection_name=None, index_name=None):
"""Purge an index from the index cache.
If `index_name` is None purge an entire collection.
If `collection_name` is None purge an entire database.
"""
if not database_name in self.__index_cache:
return
if collection_name is None:
del self.__index_cache[database_name]
return
if not collection_name in self.__index_cache[database_name]:
return
if index_name is None:
del self.__index_cache[database_name][collection_name]
return
if index_name in self.__index_cache[database_name][collection_name]:
del self.__index_cache[database_name][collection_name][index_name]
def _cache_credentials(self, source, credentials, connect=True):
"""Add credentials to the database authentication cache
for automatic login when a socket is created. If `connect` is True,
verify the credentials on the server first.
"""
if source in self.__auth_credentials:
# Nothing to do if we already have these credentials.
if credentials == self.__auth_credentials[source]:
return
raise OperationFailure('Another user is already authenticated '
'to this database. You must logout first.')
if connect:
sock_info = self.__socket()
try:
# Since __check_auth was called in __socket
# there is no need to call it here.
auth.authenticate(credentials, sock_info, self.__simple_command)
sock_info.authset.add(credentials)
finally:
self.__pool.maybe_return_socket(sock_info)
self.__auth_credentials[source] = credentials
def _purge_credentials(self, source):
"""Purge credentials from the database authentication cache.
"""
if source in self.__auth_credentials:
del self.__auth_credentials[source]
def __check_auth(self, sock_info):
"""Authenticate using cached database credentials.
"""
if self.__auth_credentials or sock_info.authset:
cached = set(self.__auth_credentials.itervalues())
authset = sock_info.authset.copy()
# Logout any credentials that no longer exist in the cache.
for credentials in authset - cached:
self.__simple_command(sock_info, credentials[0], {'logout': 1})
sock_info.authset.discard(credentials)
for credentials in cached - authset:
auth.authenticate(credentials,
sock_info, self.__simple_command)
sock_info.authset.add(credentials)
@property
def host(self):
"""Current connected host.
.. versionchanged:: 1.3
``host`` is now a property rather than a method.
"""
return self.__host
@property
def port(self):
"""Current connected port.
.. versionchanged:: 1.3
``port`` is now a property rather than a method.
"""
return self.__port
@property
def is_primary(self):
"""If this instance is connected to a standalone, a replica set
primary, or the master of a master-slave set.
.. versionadded:: 2.3
"""
return self.__is_primary
@property
def is_mongos(self):
"""If this instance is connected to mongos.
.. versionadded:: 2.3
"""
return self.__is_mongos
@property
def max_pool_size(self):
"""The maximum number of idle connections kept open in the pool for
future use.
.. note:: ``max_pool_size`` does not cap the number of concurrent
connections to the server; there is currently no way to limit the
number of connections. ``max_pool_size`` only limits the number of
**idle** connections kept open when they are returned to the pool.
.. versionadded:: 1.11
"""
return self.__max_pool_size
@property
def use_greenlets(self):
"""Whether calling :meth:`start_request` assigns greenlet-local,
rather than thread-local, sockets.
.. versionadded:: 2.4.2
"""
return self.__use_greenlets
@property
def nodes(self):
"""List of all known nodes.
Includes both nodes specified when this instance was created,
as well as nodes discovered through the replica set discovery
mechanism.
.. versionadded:: 1.8
"""
return self.__nodes
@property
def auto_start_request(self):
"""Is auto_start_request enabled?
"""
return self.__auto_start_request
def get_document_class(self):
return self.__document_class
def set_document_class(self, klass):
self.__document_class = klass
document_class = property(get_document_class, set_document_class,
doc="""Default class to use for documents
returned from this client.
.. versionadded:: 1.7
""")
@property
def tz_aware(self):
"""Does this client return timezone-aware datetimes?
.. versionadded:: 1.8
"""
return self.__tz_aware
@property
def max_bson_size(self):
"""Return the maximum size BSON object the connected server
accepts in bytes. Defaults to 4MB in server < 1.7.4.
.. versionadded:: 1.10
"""
return self.__max_bson_size
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
"""
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
start = time.time()
try:
sock_info.sock.sendall(msg)
response = self.__receive_message_on_socket(1, rqst_id, sock_info)
except:
sock_info.close()
raise
end = time.time()
response = helpers._unpack_response(response)['data'][0]
msg = "command %r failed: %%s" % spec
helpers._check_command_response(response, None, msg)
return response, end - start
def __try_node(self, node):
"""Try to connect to this node and see if it works for our connection
type. Returns ((host, port), ismaster, isdbgrid, res_time).
:Parameters:
- `node`: The (host, port) pair to try.
"""
self.disconnect()
self.__host, self.__port = node
# Call 'ismaster' directly so we can get a response time.
sock_info = self.__socket()
response, res_time = self.__simple_command(sock_info,
'admin',
{'ismaster': 1})
self.__pool.maybe_return_socket(sock_info)
# Are we talking to a mongos?
isdbgrid = response.get('msg', '') == 'isdbgrid'
if "maxBsonObjectSize" in response:
self.__max_bson_size = response["maxBsonObjectSize"]
# Replica Set?
if not self.__direct:
# Check that this host is part of the given replica set.
if self.__repl:
set_name = response.get('setName')
# The 'setName' field isn't returned by mongod before 1.6.2
# so we can't assume that if it's missing this host isn't in
# the specified set.
if set_name and set_name != self.__repl:
raise ConfigurationError("%s:%d is not a member of "
"replica set %s"
% (node[0], node[1], self.__repl))
if "hosts" in response:
self.__nodes = set([_partition_node(h)
for h in response["hosts"]])
else:
# The user passed a seed list of standalone or
# mongos instances.
self.__nodes.add(node)
if response["ismaster"]:
return node, True, isdbgrid, res_time
elif "primary" in response:
candidate = _partition_node(response["primary"])
return self.__try_node(candidate)
# Explain why we aren't using this connection.
raise AutoReconnect('%s:%d is not primary or master' % node)
# Direct connection
if response.get("arbiterOnly", False) and not self.__direct:
raise ConfigurationError("%s:%d is an arbiter" % node)
return node, response['ismaster'], isdbgrid, res_time
def __pick_nearest(self, candidates):
"""Return the 'nearest' candidate based on response time.
"""
latency = self.secondary_acceptable_latency_ms
# Only used for mongos high availability, res_time is in seconds.
fastest = min([res_time for candidate, res_time in candidates])
near_candidates = [
candidate for candidate, res_time in candidates
if res_time - fastest < latency / 1000.0
]
node = random.choice(near_candidates)
# Clear the pool from the last choice.
self.disconnect()
self.__host, self.__port = node
return node
def __find_node(self, seeds=None):
"""Find a host, port pair suitable for our connection type.
If only one host was supplied to __init__ see if we can connect
to it. Don't check if the host is a master/primary so we can make
a direct connection to read from a secondary or send commands to
an arbiter.
If more than one host was supplied treat them as a seed list for
connecting to a replica set or to support high availability for
mongos. If connecting to a replica set try to find the primary
and fail if we can't, possibly updating any replSet information
on success. If a mongos seed list was provided find the "nearest"
mongos and return it.
Otherwise we iterate through the list trying to find a host we can
send write operations to.
Sets __host and __port so that :attr:`host` and :attr:`port`
will return the address of the connected host. Sets __is_primary to
True if this is a primary or master, else False. Sets __is_mongos
to True if the connection is to a mongos.
"""
errors = []
mongos_candidates = []
candidates = seeds or self.__nodes.copy()
for candidate in candidates:
try:
node, ismaster, isdbgrid, res_time = self.__try_node(candidate)
self.__is_primary = ismaster
self.__is_mongos = isdbgrid
# No need to calculate nearest if we only have one mongos.
if isdbgrid and not self.__direct:
mongos_candidates.append((node, res_time))
continue
elif len(mongos_candidates):
raise ConfigurationError("Seed list cannot contain a mix "
"of mongod and mongos instances.")
return node
except Exception, why:
errors.append(str(why))
# If we have a mongos seed list, pick the "nearest" member.
if len(mongos_candidates):
self.__is_mongos = True
return self.__pick_nearest(mongos_candidates)
# Otherwise, try any hosts we discovered that were not in the seed list.
for candidate in self.__nodes - candidates:
try:
node, ismaster, isdbgrid, _ = self.__try_node(candidate)
self.__is_primary = ismaster
self.__is_mongos = isdbgrid
return node
except Exception, why:
errors.append(str(why))
# Couldn't find a suitable host.
self.disconnect()
raise AutoReconnect(', '.join(errors))
def __socket(self):
"""Get a SocketInfo from the pool.
"""
host, port = (self.__host, self.__port)
if host is None or (port is None and '/' not in host):
host, port = self.__find_node()
try:
if self.auto_start_request and not self.in_request():
self.start_request()
sock_info = self.__pool.get_socket((host, port))
except socket.error, why:
self.disconnect()
# Check if a unix domain socket
if host.endswith('.sock'):
host_details = "%s:" % host
else:
host_details = "%s:%d:" % (host, port)
raise AutoReconnect("could not connect to "
"%s %s" % (host_details, str(why)))
try:
self.__check_auth(sock_info)
except OperationFailure:
self.__pool.maybe_return_socket(sock_info)
raise
return sock_info
def disconnect(self):
"""Disconnect from MongoDB.
Disconnecting will close all underlying sockets in the connection
pool. If this instance is used again it will be automatically
re-opened. Care should be taken to make sure that :meth:`disconnect`
is not called in the middle of a sequence of operations in which
ordering is important. This could lead to unexpected results.
.. seealso:: :meth:`end_request`
.. versionadded:: 1.3
"""
self.__pool.reset()
self.__host = None
self.__port = None
def close(self):
"""Alias for :meth:`disconnect`
Disconnecting will close all underlying sockets in the connection
pool. If this instance is used again it will be automatically
re-opened. Care should be taken to make sure that :meth:`disconnect`
is not called in the middle of a sequence of operations in which
ordering is important. This could lead to unexpected results.
.. seealso:: :meth:`end_request`
.. versionadded:: 2.1
"""
self.disconnect()
def alive(self):
"""Return ``False`` if there has been an error communicating with the
server, else ``True``.
This method attempts to check the status of the server with minimal I/O.
The current thread / greenlet retrieves a socket from the pool (its
request socket if it's in a request, or a random idle socket if it's not
in a request) and checks whether calling `select`_ on it raises an
error. If there are currently no idle sockets, :meth:`alive` will
attempt to actually connect to the server.
A more certain way to determine server availability is::
client.admin.command('ping')
.. _select: http://docs.python.org/2/library/select.html#select.select
"""
# In the common case, a socket is available and was used recently, so
# calling select() on it is a reasonable attempt to see if the OS has
# reported an error. Note this can be wasteful: __socket implicitly
# calls select() if the socket hasn't been checked in the last second,
# or it may create a new socket, in which case calling select() is
# redundant.
try:
sock_info = self.__socket()
return not pool._closed(sock_info.sock)
except (socket.error, ConnectionFailure):
return False
def set_cursor_manager(self, manager_class):
"""Set this client's cursor manager.
Raises :class:`TypeError` if `manager_class` is not a subclass of
:class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
handles closing cursors. Different managers can implement different
policies in terms of when to actually kill a cursor that has
been closed.
:Parameters:
- `manager_class`: cursor manager to use
.. versionchanged:: 2.1+
Deprecated support for external cursor managers.
"""
warnings.warn("Support for external cursor managers is deprecated "
"and will be removed in PyMongo 3.0.",
DeprecationWarning, stacklevel=2)
manager = manager_class(self)
if not isinstance(manager, CursorManager):
raise TypeError("manager_class must be a subclass of "
"CursorManager")
self.__cursor_manager = manager
def __check_response_to_last_error(self, response):
"""Check a response to a lastError message for errors.
`response` is a byte string representing a response to the message.
If it represents an error response we raise OperationFailure.
Return the response as a document.
"""
response = helpers._unpack_response(response)
assert response["number_returned"] == 1
error = response["data"][0]
helpers._check_command_response(error, self.disconnect)
error_msg = error.get("err", "")
if error_msg is None:
return error
if error_msg.startswith("not master"):
self.disconnect()
raise AutoReconnect(error_msg)
details = error
# mongos returns the error code in an error object
# for some errors.
if "errObjects" in error:
for errobj in error["errObjects"]:
if errobj["err"] == error_msg:
details = errobj
break
if "code" in details:
if details["code"] in (11000, 11001, 12582):
raise DuplicateKeyError(details["err"], details["code"])
else:
raise OperationFailure(details["err"], details["code"])
else:
raise OperationFailure(details["err"])
def __check_bson_size(self, message):
"""Make sure the message doesn't include BSON documents larger
than the connected server will accept.
:Parameters:
- `message`: message to check
"""
if len(message) == 3:
(request_id, data, max_doc_size) = message
if max_doc_size > self.__max_bson_size:
raise InvalidDocument("BSON document too large (%d bytes)"
" - the connected server supports"
" BSON document sizes up to %d"
" bytes." %
(max_doc_size, self.__max_bson_size))
return (request_id, data)
else:
# get_more and kill_cursors messages
# don't include BSON documents.
return message
def _send_message(self, message, with_last_error=False, check_primary=True):
"""Say something to Mongo.
Raises ConnectionFailure if the message cannot be sent. Raises
OperationFailure if `with_last_error` is ``True`` and the
response to the getLastError call returns an error. Return the
response from lastError, or ``None`` if `with_last_error`
is ``False``.
:Parameters:
- `message`: message to send
- `with_last_error`: check getLastError status after sending the
message
- `check_primary`: don't try to write to a non-primary; see
kill_cursors for an exception to this rule
"""
if check_primary and not with_last_error and not self.is_primary:
# The write won't succeed, bail as if we'd done a getLastError
raise AutoReconnect("not master")
sock_info = self.__socket()
try:
(request_id, data) = self.__check_bson_size(message)
sock_info.sock.sendall(data)
# Safe mode. We pack the message together with a lastError
# message and send both. We then get the response (to the
# lastError) and raise OperationFailure if it is an error
# response.
rv = None
if with_last_error:
response = self.__receive_message_on_socket(1, request_id,
sock_info)
rv = self.__check_response_to_last_error(response)
self.__pool.maybe_return_socket(sock_info)
return rv
except OperationFailure:
self.__pool.maybe_return_socket(sock_info)
raise
except (ConnectionFailure, socket.error), e:
self.disconnect()
raise AutoReconnect(str(e))
except:
sock_info.close()
raise
def __receive_data_on_socket(self, length, sock_info):
"""Lowest level receive operation.
Takes length to receive and repeatedly calls recv until able to
return a buffer of that length, raising ConnectionFailure on error.
"""
message = EMPTY
while length:
chunk = sock_info.sock.recv(length)
if chunk == EMPTY:
raise ConnectionFailure("connection closed")
length -= len(chunk)
message += chunk
return message
def __receive_message_on_socket(self, operation, request_id, sock_info):
"""Receive a message in response to `request_id` on `sock`.
Returns the response data with the header removed.
"""
header = self.__receive_data_on_socket(16, sock_info)
length = struct.unpack("<i", header[:4])[0]
assert request_id == struct.unpack("<i", header[8:12])[0], \
"ids don't match %r %r" % (request_id,
struct.unpack("<i", header[8:12])[0])
assert operation == struct.unpack("<i", header[12:])[0]
return self.__receive_data_on_socket(length - 16, sock_info)
def __send_and_receive(self, message, sock_info):
"""Send a message on the given socket and return the response data.
"""
(request_id, data) = self.__check_bson_size(message)
try:
sock_info.sock.sendall(data)
return self.__receive_message_on_socket(1, request_id, sock_info)
except:
sock_info.close()
raise
# we just ignore _must_use_master here: it's only relevant for
# MasterSlaveConnection instances.
def _send_message_with_response(self, message,
_must_use_master=False, **kwargs):
"""Send a message to Mongo and return the response.
Sends the given message and returns the response.
:Parameters:
- `message`: (request_id, data) pair making up the message to send
"""
sock_info = self.__socket()
try:
try:
if "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs["network_timeout"])
return self.__send_and_receive(message, sock_info)
except (ConnectionFailure, socket.error), e:
self.disconnect()
raise AutoReconnect(str(e))
finally:
if "network_timeout" in kwargs:
try:
# Restore the socket's original timeout and return it to
# the pool
sock_info.sock.settimeout(self.__net_timeout)
self.__pool.maybe_return_socket(sock_info)
except socket.error:
# There was an exception and we've closed the socket
pass
else:
self.__pool.maybe_return_socket(sock_info)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. This ensures consistent reads,
even if you read after an unacknowledged write.
In Python 2.6 and above, or in Python 2.5 with
"from __future__ import with_statement", :meth:`start_request` can be
used as a context manager:
>>> client = pymongo.MongoClient(auto_start_request=False)
>>> db = client.test
>>> _id = db.test_collection.insert({})
>>> with client.start_request():
... for i in range(100):
... db.test_collection.update({'_id': _id}, {'$set': {'i':i}})
...
... # Definitely read the document after the final update completes
... print db.test_collection.find({'_id': _id})
If a thread or greenlet calls start_request multiple times, an equal
number of calls to :meth:`end_request` is required to end the request.
.. versionchanged:: 2.4
Now counts the number of calls to start_request and doesn't end
request until an equal number of calls to end_request.
.. versionadded:: 2.2
The :class:`~pymongo.pool.Request` return value.
:meth:`start_request` previously returned None
"""
self.__pool.start_request()
return pool.Request(self)
def in_request(self):
"""True if this thread is in a request, meaning it has a socket
reserved for its exclusive use.
"""
return self.__pool.in_request()
def end_request(self):
"""Undo :meth:`start_request`. If :meth:`end_request` is called as many
times as :meth:`start_request`, the request is over and this thread's
connection returns to the pool. Extra calls to :meth:`end_request` have
no effect.
Ending a request allows the :class:`~socket.socket` that has
been reserved for this thread by :meth:`start_request` to be returned to
the pool. Other threads will then be able to re-use that
:class:`~socket.socket`. If your application uses many threads, or has
long-running threads that infrequently perform MongoDB operations, then
judicious use of this method can lead to performance gains. Care should
be taken, however, to make sure that :meth:`end_request` is not called
in the middle of a sequence of operations in which ordering is
important. This could lead to unexpected results.
"""
self.__pool.end_request()
def __eq__(self, other):
if isinstance(other, self.__class__):
us = (self.__host, self.__port)
them = (other.__host, other.__port)
return us == them
return NotImplemented
def __ne__(self, other):
return not self == other
def __repr__(self):
if len(self.__nodes) == 1:
return "MongoClient(%r, %r)" % (self.__host, self.__port)
else:
return "MongoClient(%r)" % ["%s:%d" % n for n in self.__nodes]
def __getattr__(self, name):
"""Get a database by name.
Raises :class:`~pymongo.errors.InvalidName` if an invalid
database name is used.
:Parameters:
- `name`: the name of the database to get
"""
return database.Database(self, name)
def __getitem__(self, name):
"""Get a database by name.
Raises :class:`~pymongo.errors.InvalidName` if an invalid
database name is used.
:Parameters:
- `name`: the name of the database to get
"""
return self.__getattr__(name)
def close_cursor(self, cursor_id):
"""Close a single database cursor.
Raises :class:`TypeError` if `cursor_id` is not an instance of
``(int, long)``. What closing the cursor actually means
depends on this client's cursor manager.
:Parameters:
- `cursor_id`: id of cursor to close
"""
if not isinstance(cursor_id, (int, long)):
raise TypeError("cursor_id must be an instance of (int, long)")
self.__cursor_manager.close(cursor_id)
def kill_cursors(self, cursor_ids):
"""Send a kill cursors message with the given ids.
Raises :class:`TypeError` if `cursor_ids` is not an instance of
``list``.
:Parameters:
- `cursor_ids`: list of cursor ids to kill
"""
if not isinstance(cursor_ids, list):
raise TypeError("cursor_ids must be a list")
return self._send_message(
message.kill_cursors(cursor_ids), check_primary=False)
def server_info(self):
"""Get information about the MongoDB server we're connected to.
"""
return self.admin.command("buildinfo")
def database_names(self):
"""Get a list of the names of all databases on the connected server.
"""
return [db["name"] for db in
self.admin.command("listDatabases")["databases"]]
def drop_database(self, name_or_database):
"""Drop a database.
Raises :class:`TypeError` if `name_or_database` is not an instance of
:class:`basestring` (:class:`str` in python 3) or Database.
:Parameters:
- `name_or_database`: the name of a database to drop, or a
:class:`~pymongo.database.Database` instance representing the
database to drop
"""
name = name_or_database
if isinstance(name, database.Database):
name = name.name
if not isinstance(name, basestring):
raise TypeError("name_or_database must be an instance of "
"%s or Database" % (basestring.__name__,))
self._purge_index(name)
self[name].command("dropDatabase")
def copy_database(self, from_name, to_name,
from_host=None, username=None, password=None):
"""Copy a database, potentially from another host.
Raises :class:`TypeError` if `from_name` or `to_name` is not
an instance of :class:`basestring` (:class:`str` in python 3).
Raises :class:`~pymongo.errors.InvalidName` if `to_name` is
not a valid database name.
If `from_host` is ``None`` the current host is used as the
source. Otherwise the database is copied from `from_host`.
If the source database requires authentication, `username` and
`password` must be specified.
:Parameters:
- `from_name`: the name of the source database
- `to_name`: the name of the target database
- `from_host` (optional): host name to copy from
- `username` (optional): username for source database
- `password` (optional): password for source database
.. note:: Specifying `username` and `password` requires server
version **>= 1.3.3+**.
.. versionadded:: 1.5
"""
if not isinstance(from_name, basestring):
raise TypeError("from_name must be an instance "
"of %s" % (basestring.__name__,))
if not isinstance(to_name, basestring):
raise TypeError("to_name must be an instance "
"of %s" % (basestring.__name__,))
database._check_name(to_name)
command = {"fromdb": from_name, "todb": to_name}
if from_host is not None:
command["fromhost"] = from_host
try:
self.start_request()
if username is not None:
nonce = self.admin.command("copydbgetnonce",
fromhost=from_host)["nonce"]
command["username"] = username
command["nonce"] = nonce
command["key"] = auth._auth_key(nonce, username, password)
return self.admin.command("copydb", **command)
finally:
self.end_request()
@property
def is_locked(self):
"""Is this server locked? While locked, all write operations
are blocked, although read operations may still be allowed.
Use :meth:`unlock` to unlock.
.. versionadded:: 2.0
"""
ops = self.admin.current_op()
return bool(ops.get('fsyncLock', 0))
def fsync(self, **kwargs):
"""Flush all pending writes to datafiles.
:Parameters:
Optional parameters can be passed as keyword arguments:
- `lock`: If True lock the server to disallow writes.
- `async`: If True don't block while synchronizing.
.. warning:: `async` and `lock` can not be used together.
.. warning:: MongoDB does not support the `async` option
on Windows and will raise an exception on that
platform.
.. versionadded:: 2.0
"""
self.admin.command("fsync", **kwargs)
def unlock(self):
"""Unlock a previously locked server.
.. versionadded:: 2.0
"""
self.admin['$cmd'].sys.unlock.find_one()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
def __iter__(self):
return self
def next(self):
raise TypeError("'MongoClient' object is not iterable")