You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 lines
26 KiB

using System;
using System.Collections.Generic;
using System.Configuration;
using log4net;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Toogps.Mongo
{
public class MongoServer
{
/// <summary>
/// 服务器地址
/// </summary>
private readonly string _mongodbServers;
/// <summary>
/// 数据库名称
/// </summary>
private readonly string _mongoDbName;
/// <summary>
/// 集群名字
/// </summary>
private readonly string _replicaSetName;
/// <summary>
/// 持久化要求
/// </summary>
private bool _journal = false;
private MongoClient _client;
public MongoServer()
{
_mongodbServers = ConfigurationManager.AppSettings["MongoDBServers"];
_mongoDbName = ConfigurationManager.AppSettings["MongoDBName"];
_replicaSetName = ConfigurationManager.AppSettings["MongoDBReplicaSetName"];
}
/// <param name="mongodbServers">服务器地址</param>
/// <param name="mongoDbName">数据库名称</param>
/// <param name="replicaSetName">集群名字</param>
public MongoServer(string replicaSetName, string mongodbServers, string mongoDbName)
{
_mongodbServers = mongodbServers;
_replicaSetName = replicaSetName;
_mongoDbName = mongoDbName;
}
public MongoClient Client
{
get
{
if (_client == null)
{
_client = InitClient();
}
return _client;
}
}
private MongoClient InitClient()
{
////LogManager.GetLogger(this.GetType()).Error(String.Format("InitClient "));
////mongodb://host1,host2,host3/?slaveOk=true
//var mongoClientSettings = new MongoClientSettings();
//if (!string.IsNullOrWhiteSpace(_replicaSetName))
//{
// mongoClientSettings.ClusterConfigurator = builder =>
// {
// builder.ConfigureCluster(settings => settings.With(serverSelectionTimeout: TimeSpan.FromSeconds(50)));
// };
// mongoClientSettings.ReplicaSetName = _replicaSetName;
// mongoClientSettings.ReadPreference = new ReadPreference(ReadPreferenceMode.SecondaryPreferred);
//}
//mongoClientSettings.Servers = GetServerAddresses();
//mongoClientSettings.MaxConnectionPoolSize = 80;
//mongoClientSettings.WaitQueueSize = 150;
////var b = mongoClientSettings.WaitQueueSize;
////var a = WaitQueueSize { get; set; }
//// public int MaxConnectionPoolSize { get; set; }
////// 摘要:
////// Gets or sets the wait queue size.
////public int WaitQueueSize { get; set; }
////从节点读取优先
////写入级别的问题文章http://kyfxbl.iteye.com/blog/1952941
////write concern:0(Unacknowledged),性能最好,可靠性最差,不考虑是否写入成功,会捕捉网络错误 WriteConcern.Unacknowledged;
////write concern:1(acknowledged) 具备基本可靠性,mongodb的默认设置级别,可能在内存中的数据丢失 //WriteConcern.Acknowledged
////write concern:1 & journal:true(Jounaled)可靠性,确保数据到硬盘中,
////write concern:2(Replica Acknowledged)也可以同时设置journal:true或j:true,则还要等journal写入也成功后才会返回。
////mongoClientSettings.WriteConcern = WriteConcern.Acknowledged;
//// public WriteConcern(Optional<WriteConcern.WValue> w = null, Optional<TimeSpan?> wTimeout = null, Optional<bool?> fsync = null, Optional<bool?> journal = null);
////public WriteConcern(int w, Optional<TimeSpan?> wTimeout = null, Optional<bool?> fsync = null, Optional<bool?> journal = null);
////public WriteConcern(string mode, Optional<TimeSpan?> wTimeout = null, Optional<bool?> fsync = null, Optional<bool?> journal = null);
//mongoClientSettings.WriteConcern = new WriteConcern(1, null, null, _journal);
////mongoClientSettings.WriteConcern = WriteConcern.W3;
////mongoClientSettings.WriteConcern = WriteConcern.WMajority;
////mongoClientSettings.WriteConcern
////var j = mongoClientSettings.WriteConcern.Journal;
//_client = new MongoClient(mongoClientSettings);
////var a = mongoClientSettings.WriteConcern.Journal;
//// builder.ConfigureCluster(settings => settings.With(serverSelectionTimeout: TimeSpan.FromSeconds(10)));
//// }
////});zh
////选项 描述
////replicaSet=name 验证replica set的名称。 Impliesconnect=replicaSet.
////slaveOk=true|false
////true:在connect=direct模式下,驱动会连接第一台机器,即使这台服务器不是主。
////在connect=replicaSet模式下,驱动会发送所有的写请求到主并且把读取操作分布在其他从服务器。
////false: 在 connect=direct模式下,驱动会自动找寻主服务器.
////在connect=replicaSet 模式下,驱动仅仅连接主服务器,并且所有的读写命令都连接到主服务器。
////safe=true|false
////true: 在执行更新操作之后,驱动都会发送getLastError命令来确保更新成功。(还要参考 wtimeoutMS).
////false: 在每次更新之后,驱动不会发送getLastError来确保更新成功。
////w=n 驱动添加 { w : n } 到getLastError命令. 应用于safe=true。
////wtimeoutMS=ms 驱动添加 { wtimeout : ms } 到 getlasterror 命令. 应用于 safe=true.
////fsync=true|false
////true: 驱动添加 { fsync : true } 到 getlasterror 命令.应用于 safe=true.
////false: 驱动不会添加到getLastError命令中。
////journal=true|false 如果设置为 true, 同步到 journal (在提交到数据库前写入到实体中). 应用于 safe=true
////connectTimeoutMS=ms 可以打开连接的时间。
////socketTimeoutMS=ms 发送和接受sockets的时间。
return new MongoClient(_mongodbServers);
}
public List<MongoServerAddress> GetServerAddresses(string mongodbServers)
{
var mongoServerAddresses = new List<MongoServerAddress>();
var servers = mongodbServers.Split(',');
foreach (var server in servers)
{
var ipAndPost = server.Split(':');
var s = new MongoServerAddress(ipAndPost[0], int.Parse(ipAndPost[1]));
mongoServerAddresses.Add(s);
}
return mongoServerAddresses;
}
private List<MongoServerAddress> GetServerAddresses()
{
var mongoServerAddresses = new List<MongoServerAddress>();
var servers = _mongodbServers.Split(',');
foreach (var server in servers)
{
var ipAndPost = server.Split(':');
var s = new MongoServerAddress(ipAndPost[0], int.Parse(ipAndPost[1]));
mongoServerAddresses.Add(s);
}
return mongoServerAddresses;
}
public IMongoDatabase MongoDatabase
{
get
{
//var set = new MongoDatabaseSettings();
//set.ReadPreference = new ReadPreference(ReadPreferenceMode.SecondaryPreferred);
return Client.GetDatabase(_mongoDbName);
}
}
public bool InsertOne<T>(T t, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<BsonDocument>(collectionName);
var bd = t.ToBsonDocument();
collection.InsertOne(bd);
return true;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("InsertOne collectionName {0},Exception {1}", collectionName, ex.Message));
}
return false;
}
public void InsertOneAsync<T>(T t, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<BsonDocument>(collectionName);
var bd = t.ToBsonDocument();
collection.InsertOneAsync(bd);
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("InsertOneAsync collectionName {0},Exception {1}", collectionName, ex.Message));
}
}
public bool InsertMany<T>(List<T> list, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<BsonDocument>(collectionName);
var bsonList = new List<BsonDocument>();
list.ForEach(t => bsonList.Add(t.ToBsonDocument()));
collection.InsertMany(bsonList);
return true;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("InsertMany collectionName {0},Exception {1}", collectionName, ex.Message));
}
return false;
}
public void InsertManyAsync<T>(List<T> list, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<BsonDocument>(collectionName);
var bsonList = new List<BsonDocument>();
list.ForEach(t => bsonList.Add(t.ToBsonDocument()));
collection.InsertManyAsync(bsonList);
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("InsertManyAsync collectionName {0},Exception {1}", collectionName, ex.Message));
}
}
public void RunCommand<T>(Command<T> command)
{
try
{
MongoDatabase.RunCommand(command);
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("RunCommand command {0},Exception {1}", command, ex.Message));
}
}
public void TouchCollection(string collectionName)
{
try
{
var command = new JsonCommand<string>("{\"touch\":\"" + collectionName + "\",\"data\":\"true\",\"index\":\"true\"}");
MongoDatabase.RunCommand(command);
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("RunCommand command {0},Exception {1}", collectionName, ex.Message));
}
}
public bool ReplaceOne<T>(FilterDefinition<T> filter, T replacement, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var result = collection.ReplaceOne(filter, replacement);
return result.IsModifiedCountAvailable;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("ReplaceOne collectionName {0},Exception {1}", collectionName, ex.Message));
return false;
}
}
public T FindOneAndReplace<T>(FilterDefinition<T> filter, T replacement, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var result = collection.FindOneAndReplace(filter, replacement);
return result;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("FindOneAndReplace collectionName {0} , Exception {1}", collectionName, ex.Message));
return default(T);
}
}
public bool ReplaceOneAsync<T>(FilterDefinition<T> filter, T replacement, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
collection.ReplaceOneAsync(filter, replacement);
return true;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("ReplaceOne collectionName {0},Exception {1}", collectionName, ex.Message));
return false;
}
}
/// <summary>
/// 先找出一个并替换,如找不到,再插入。
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <param name="filter">筛选条件
/// 如:var filter = Builders<T/>.Filter.Eq("CompanyId", id)
/// & Builders<T/>.Filter.Gt("VehicleId", 0);
/// </param>
/// <param name="replacement">替换的实体对象(尽量与原实体对象结构保持一致)</param>
/// <param name="collectionName">数据集名称</param>
public void ReplaceOrInsertOne<T>(FilterDefinition<T> filter, T replacement, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var result = collection.FindOneAndReplace(filter, replacement);
if (result == null)
{
collection.InsertOneAsync(replacement);
}
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("ReplaceOrInsertOne collectionName {0} , Exception {1}", collectionName, ex.Message));
}
}
public bool UpdateOne<T>(FilterDefinition<T> filter, UpdateDefinition<T> updateDefinition, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var result = collection.UpdateOne(filter, updateDefinition);
return result.IsModifiedCountAvailable;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("UpdateOne collectionName {0},Exception {1}", collectionName, ex.Message));
return false;
}
}
public bool UpdateOneAsync<T>(FilterDefinition<T> filter, UpdateDefinition<T> updateDefinition, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
collection.UpdateOneAsync(filter, updateDefinition);
return true;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("UpdateOneAsync collectionName {0},Exception {1}", collectionName, ex.Message));
return false;
}
}
public T FindOne<T>(FilterDefinition<T> filter, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).FirstOrDefault();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("FindOne collectionName {0},Exception {1}", collectionName, ex.Message));
return default(T);
}
}
/// <summary>
/// 获取符合筛选条件的数据
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <param name="filter">筛选条件
/// 如:var filter = Builders<T/>.Filter.Eq("CompanyId", id)
/// & Builders<T/>.Filter.Gt("VehicleId", 0);
/// </param>
/// <param name="collectionName">数据集名称</param>
/// <returns></returns>
public List<T> Find<T>(FilterDefinition<T> filter, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).ToList();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("FindOne collectionName {0},Exception {1}", collectionName, ex.Message));
return new List<T>();
}
}
/// <summary>
/// 获取符合筛选条件的数据,排序
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <param name="filter">筛选条件
/// 如:var filter = Builders<T/>.Filter.Eq("CompanyId", id)
/// & Builders<T/>.Filter.Gt("VehicleId", 0);
/// </param>
/// <param name="sort">var sort = Builders<T/>.Sort.Descending(s => s.VehicleId);</param>
/// <param name="collectionName">数据集名称</param>
/// <returns></returns>
public List<T> Find<T>(FilterDefinition<T> filter, SortDefinition<T> sort, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).Sort(sort).ToList();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("Find collectionName {0},Exception {1}", collectionName, ex.Message));
return new List<T>();
}
}
/// <summary>
/// 获取符合筛选条件的数据,排序,分页
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <param name="filter">筛选条件
/// 如:var filter = Builders<T/>.Filter.Eq("CompanyId", id)
/// & Builders<T/>.Filter.Gt("VehicleId", 0);
/// </param>
/// <param name="pageNumber">页码</param>
/// <param name="pageSize">记录数</param>
/// <param name="collectionName">数据集名称</param>
/// <returns></returns>
public List<T> FindForPage<T>(FilterDefinition<T> filter, int pageNumber, int pageSize, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).Skip((pageNumber - 1) * pageSize).Limit(pageSize).ToList();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("Find collectionName {0},Exception {1}", collectionName, ex.Message));
return new List<T>();
}
}
/// <summary>
/// 获取符合筛选条件的数据,排序,分页
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <param name="filter">筛选条件
/// 如:var filter = Builders<T/>.Filter.Eq("CompanyId", id)
/// & Builders<T/>.Filter.Gt("VehicleId", 0);
/// </param>
/// <param name="sort">var sort = Builders<T/>.Sort.Descending(s => s.VehicleId);</param>
/// <param name="pageNumber">页码</param>
/// <param name="pageSize">记录数</param>
/// <param name="collectionName">数据集名称</param>
/// <returns></returns>
public List<T> FindForPage<T>(FilterDefinition<T> filter, SortDefinition<T> sort, int pageNumber, int pageSize, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).Sort(sort).Skip((pageNumber - 1) * pageSize).Limit(pageSize).ToList();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("Find collectionName {0},Exception {1}", collectionName, ex.Message));
return new List<T>();
}
}
/// <summary>
/// 根据需求列,获取数据
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <typeparam name="T2">新实体对象</typeparam>
/// <param name="filter">筛选条件</param>
/// <param name="projectionDefinition">
/// 例:var pj1 =Builders<T/>.Projection.Expression(s => new {s.VehicleId, s.GpsTime, s.Longitude, s.Latitude, s.VehicleCode});
/// 或 var pj2 = Builders<T/>.Projection.Expression(s =>new CarCarGps
/// {
/// VehicleId = s.VehicleId,
/// GpsTime = s.GpsTime,
/// Longitude = s.Longitude,
/// Latitude = s.Latitude,
/// VehicleCode = s.VehicleCode
/// });
/// </param>
/// <param name="collectionName">数据集名称</param>
/// <returns></returns>
public List<T2> FindByProjection<T, T2>(FilterDefinition<T> filter, ProjectionDefinition<T, T2> projectionDefinition, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).Project(projectionDefinition).ToList();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("Find collectionName {0},Exception {1}", collectionName, ex.Message));
return new List<T2>();
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="T">原实体对象</typeparam>
/// <typeparam name="T2">新实体对象</typeparam>
/// <param name="filter">筛选条件</param>
/// <param name="projectionDefinition">
/// 例:var pj1 =Builders<T/>.Projection.Expression(s => new {s.VehicleId, s.GpsTime, s.Longitude, s.Latitude, s.VehicleCode});
/// 或 var pj2 = Builders<T/>.Projection.Expression(s =>new CarCarGps
/// {
/// VehicleId = s.VehicleId,
/// GpsTime = s.GpsTime,
/// Longitude = s.Longitude,
/// Latitude = s.Latitude,
/// VehicleCode = s.VehicleCode
/// });
/// </param>
/// <param name="sort">var sort = Builders<T/>.Sort.Descending(s => s.VehicleId);</param>
/// <param name="pageNumber">页码</param>
/// <param name="pageSize">记录数</param>
/// <param name="collectionName">数据集名称</param>
/// <returns></returns>
public List<T2> FindByProjection<T, T2>(FilterDefinition<T> filter, ProjectionDefinition<T, T2> projectionDefinition, SortDefinition<T> sort, int pageNumber, int pageSize, string collectionName)
{
try
{
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).Project(projectionDefinition).Sort(sort).Skip((pageNumber - 1) * pageSize).Limit(pageSize).ToList();
return t;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("Find collectionName {0},Exception {1}", collectionName, ex.Message));
return new List<T2>();
}
}
public bool DropCollection(string collectionName)
{
try
{
MongoDatabase.DropCollection(collectionName);
return true;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("DropCollection {0},Exception {1}", collectionName, ex.Message));
}
return false;
}
public bool DropCollectionAsync(string collectionName)
{
try
{
MongoDatabase.DropCollectionAsync(collectionName);
return true;
}
catch (Exception ex)
{
LogManager.GetLogger(this.GetType()).Error(String.Format("DropCollection {0},Exception {1}", collectionName, ex.Message));
}
return false;
}
public int GetCount<T>(string collectionName, FilterDefinition<T> filter, out int totalCount)
{
totalCount = int.Parse(MongoDatabase.GetCollection<T>(collectionName).Find(filter).Count().ToString());
return totalCount;
}
public List<T> GetListPage<T>(string collectionName, FilterDefinition<T> filter, out int totalCount, SortDefinition<T> sort, int pageIndex = 1, int pageSize = 10)
{
totalCount = int.Parse(MongoDatabase.GetCollection<T>(collectionName).Find(filter).Count().ToString());
var collection = MongoDatabase.GetCollection<T>(collectionName);
var t = collection.Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToList();
return t;
}
}
}