#r "nuget:Docker.DotNet"
#r "nuget:Npgsql"
using Docker.DotNet;
using Docker.DotNet.Models;
using Npgsql;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
DbConnection Db;
DockerClient _client;
CreateContainerResponse _containerResponse;
_client = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")).CreateClient();
var hostPort = new Random((int)DateTime.UtcNow.Ticks).Next(10000, 12000);
var images = await _client.Images.ListImagesAsync(new ImagesListParameters()
{
Filters = new Dictionary<string, IDictionary<string, bool>>
{
["reference"] = new Dictionary<string, bool>
{
["postgres:latest"] = true
}
}
}, CancellationToken.None);
//check if container exists
var pgImage = images.FirstOrDefault();
if (pgImage == null) throw new Exception($"Docker image for postgres:latest not found.");
//create container from image
var container = await _client.Containers.CreateContainerAsync(new CreateContainerParameters()
{
User = "postgres",
Env = new List<string>() {
"POSTGRES_PASSWORD=password","POSTGRES_DB=repotest","POSTGRES_USER=postgres"
},
ExposedPorts = new Dictionary<string, EmptyStruct>()
{
["5432"] = new EmptyStruct()
},
HostConfig = new HostConfig()
{
PortBindings = new Dictionary<string, IList<PortBinding>>()
{
["5432"] = new List<PortBinding>(){new PortBinding() {HostIP = "0.0.0.0", HostPort = $"{hostPort}"}}
}
},
Image = "postgres:latest",
}, CancellationToken.None);
if (!await _client.Containers.StartContainerAsync(container.ID, new ContainerStartParameters()
{ DetachKeys = "d=postgres" }, CancellationToken.None))
{ throw new Exception($"Could not start container: {container.ID}");
}
var count = 10;
Thread.Sleep(5000);
var containerStat = await _client.Containers.InspectContainerAsync(container.ID, CancellationToken.None);
while (!containerStat.State.Running && count-- > 0)
{
Thread.Sleep(1000);
containerStat = await _client.Containers.InspectContainerAsync(container.ID, CancellationToken.None);
}
Thread.Sleep(10000); //I need some time for the DB to finish starting up so that my tests don't report the DB is starting up
var connectionStringBuilder = new NpgsqlConnectionStringBuilder() {
ConnectionString = $"User ID=postgres;Password=password;Server=127.0.0.1;Port={hostPort};" +
"Database=repotest;Integrated Security=true;Pooling=false;CommandTimeout=300" };
Db = new NpgsqlConnection(connectionStringBuilder.ConnectionString);
Db.Open();
Console.WriteLine(String.Equals("repotest", Db.Database));
True
using (var cmd = Db.CreateCommand()) {
cmd.CommandText = "DROP TABLE IF EXISTS datatimeseries";
cmd.ExecuteNonQuery();
cmd.CommandText = "CREATE TABLE datatimeseries(idtimeseries INTEGER NOT NULL, timepoint TIMESTAMP NOT NULL, value1 NUMERIC NOT NULL)";
cmd.ExecuteNonQuery();
}
public void DumpDataTableToDB(string TableName, DataTable dt, int nCount)
{
Dictionary<Type, NpgsqlTypes.NpgsqlDbType> TypeDict = new Dictionary<Type, NpgsqlTypes.NpgsqlDbType>();
TypeDict.Add(typeof(int), NpgsqlTypes.NpgsqlDbType.Integer);
TypeDict.Add(typeof(double), NpgsqlTypes.NpgsqlDbType.Double);
TypeDict.Add(typeof(decimal), NpgsqlTypes.NpgsqlDbType.Numeric);
TypeDict.Add(typeof(string), NpgsqlTypes.NpgsqlDbType.Varchar);
TypeDict.Add(typeof(DateTime), NpgsqlTypes.NpgsqlDbType.Timestamp);
TypeDict.Add(typeof(char[]), NpgsqlTypes.NpgsqlDbType.Varchar);
TypeDict.Add(typeof(Guid), NpgsqlTypes.NpgsqlDbType.Uuid);
string sql = "COPY " + TableName + " ( ";
foreach (System.Data.DataColumn col in dt.Columns)
{
sql += (col.ColumnName.ToLower() + ",");
}
sql = sql.TrimEnd(',') + ") FROM STDIN (FORMAT BINARY)";
int nRows = dt.Rows.Count;
using (var BulkWrite = ((Npgsql.NpgsqlConnection)Db).BeginBinaryImport(sql))
{
for (int idRow = 0; idRow < nRows; idRow++)
{
BulkWrite.StartRow();
foreach (System.Data.DataColumn col in dt.Columns)
{
if (dt.Rows[idRow].IsNull(col))
{
BulkWrite.WriteNull();
}
else
{
if (col.DataType == typeof(string) && string.IsNullOrEmpty(dt.Rows[idRow].Field<string>(col)))
{
BulkWrite.WriteNull();
}
else
{
BulkWrite.Write(dt.Rows[idRow][col.Ordinal], TypeDict[col.DataType]);
}
}
}
}
BulkWrite.Complete();
}
}
DataTable dt = new DataTable();
dt.Columns.Add("IDTIMESERIES", typeof(int));
dt.Columns.Add("TIMEPOINT", typeof(DateTime));
dt.Columns.Add("VALUE1", typeof(decimal));
int nRowsMaxDt = 1000;
DateTime now = DateTime.Now;
for(int i = 0; i < nRowsMaxDt; i++)
{
object[] oRowDt = new object[3];
oRowDt[0] = i / 100; oRowDt[1] = now.AddHours(i); oRowDt[2] = (decimal)(20 + 10 * Math.Sin(i / 66.0));
dt.Rows.Add(oRowDt);
}
DumpDataTableToDB("datatimeseries", dt, nRowsMaxDt);
using (var cmd = Db.CreateCommand()) {
cmd.CommandText = "SELECT COUNT(*) FROM datatimeseries";
var nRows = cmd.ExecuteScalar();
Console.WriteLine($"Total rows in table - Expected 1000 - Found {nRows}");
}
Total rows in table - Expected 1000 - Found 1000
Db.Close();
Db.Dispose();
if (await _client.Containers.StopContainerAsync(container.ID, new ContainerStopParameters(), CancellationToken.None))
{
Console.WriteLine("Container " + container.ID + " Stopped.");
//delete container
await _client.Containers.RemoveContainerAsync(container.ID, new ContainerRemoveParameters(), CancellationToken.None);
Console.WriteLine("Container " + container.ID + " Deleted.");
}
else
{
Console.WriteLine("Container " + container.ID + " Unaffacted.");
}
_client?.Dispose();
Container 3381398387d1a321f0bfd73e579ee3c4bdbd209aaa746fb9fe0a8f20dd46fc3b Deleted.