Kinetica   C#   API  Version 7.2.3.1
HttpClientTransport.cs
Go to the documentation of this file.
1 using System;
2 using System.IO;
3 using System.Net;
4 using System.Net.Http;
5 using System.Net.Http.Headers;
6 using System.Threading;
7 using System.Threading.Tasks;
8 
9 namespace kinetica;
10 
15  internal sealed class HttpClientTransport : IHttpTransport, IDisposable
16  {
17  private readonly HttpClient _client;
18  private readonly bool _ownsClient;
19 
27  TimeSpan timeout,
28  TimeSpan? pooledConnectionLifetime = null,
29  TimeSpan? pooledConnectionIdleTimeout = null)
30  {
31  var handler = new SocketsHttpHandler
32  {
33  // Connection pooling with DNS refresh
34  PooledConnectionLifetime = pooledConnectionLifetime ?? TimeSpan.FromMinutes(2),
35  PooledConnectionIdleTimeout = pooledConnectionIdleTimeout ?? TimeSpan.FromMinutes(2),
36 
37  // Kinetica-specific optimizations
38  AutomaticDecompression = DecompressionMethods.None, // Kinetica uses Snappy
39  UseCookies = false, // Not needed for API calls
40  AllowAutoRedirect = false, // API endpoints don't redirect
41  };
42 
43  _client = new HttpClient(handler, disposeHandler: true)
44  {
45  Timeout = timeout,
46  };
47  _ownsClient = true;
48  }
49 
54  internal HttpClientTransport(HttpClient client)
55  {
56  _client = client;
57  _ownsClient = false;
58  }
59 
63  public byte[] Post(
64  string url,
65  byte[] body,
66  string contentType,
67  string? authorization,
68  CancellationToken cancellationToken)
69  {
70  using var request = BuildRequest(url, body, contentType, authorization);
71  using var response = _client.Send(request, cancellationToken);
72  return ReadOrThrow(response, cancellationToken);
73  }
74 
78  public async Task<byte[]> PostAsync(
79  string url,
80  byte[] body,
81  string contentType,
82  string? authorization,
83  CancellationToken cancellationToken)
84  {
85  using var request = BuildRequest(url, body, contentType, authorization);
86  using var response = await _client
87  .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken)
88  .ConfigureAwait(false);
89 
90  var bytes = await response.Content
91  .ReadAsByteArrayAsync(cancellationToken)
92  .ConfigureAwait(false);
93 
94  if (response.IsSuccessStatusCode)
95  return bytes;
96 
97  throw new KineticaTransportException((int)response.StatusCode, bytes);
98  }
99 
100  private static HttpRequestMessage BuildRequest(
101  string url,
102  byte[] body,
103  string contentType,
104  string? authorization)
105  {
106  var request = new HttpRequestMessage(HttpMethod.Post, url)
107  {
108  Content = new ByteArrayContent(body),
109  // Force HTTP/1.1 to preserve existing behaviour — Kinetica servers
110  // typically do not support HTTP/2 and mis-negotiation would break
111  // the connection silently.
112  Version = HttpVersion.Version11,
113  };
114  request.Content.Headers.ContentType = MediaTypeHeaderValue.Parse(contentType);
115  request.Content.Headers.ContentLength = body.Length;
116 
117  if (!string.IsNullOrEmpty(authorization))
118  {
119  var space = authorization.IndexOf(' ');
120  if (space > 0)
121  {
122  request.Headers.Authorization = new AuthenticationHeaderValue(
123  authorization[..space],
124  authorization[(space + 1)..]);
125  }
126  else
127  {
128  // Handle authorization without scheme (legacy compatibility)
129  request.Headers.Add("Authorization", authorization);
130  }
131  }
132 
133  return request;
134  }
135 
136  private static byte[] ReadOrThrow(
137  HttpResponseMessage response,
138  CancellationToken cancellationToken)
139  {
140  using var stream = response.Content.ReadAsStream(cancellationToken);
141  using var buffer = new MemoryStream();
142  stream.CopyTo(buffer);
143  var bytes = buffer.ToArray();
144 
145  if (response.IsSuccessStatusCode)
146  return bytes;
147 
148  // The Kinetica server encodes error bodies with the same Avro/JSON
149  // envelope as success responses. Hand the raw bytes back to the
150  // caller via KineticaTransportException so that SubmitRequestToUrlInternal
151  // can decode the server's error message.
152  throw new KineticaTransportException((int)response.StatusCode, bytes);
153  }
154 
155  public void Dispose()
156  {
157  if (_ownsClient)
158  _client.Dispose();
159  }
160  }
161 
167  internal sealed class KineticaTransportException : Exception
168  {
172  public int StatusCode { get; }
173 
177  public byte[] Body { get; }
178 
179  public KineticaTransportException(int statusCode, byte[] body)
180  : base($"Kinetica server returned HTTP {statusCode}.")
181  {
182  StatusCode = statusCode;
183  Body = body;
184  }
185  }
int StatusCode
HTTP status code from the server response.
byte [] Body
Raw response body bytes (may contain Avro-encoded error message).
byte [] Post(string url, byte[] body, string contentType, string? authorization, CancellationToken cancellationToken)
Synchronous POST request.
Thrown by HttpClientTransport when the server responds with a non-2xx status code.
Abstraction over the raw HTTP POST layer.
async Task< byte[]> PostAsync(string url, byte[] body, string contentType, string? authorization, CancellationToken cancellationToken)
Asynchronous POST request.
IHttpTransport implementation backed by HttpClient
HttpClientTransport(TimeSpan timeout, TimeSpan? pooledConnectionLifetime=null, TimeSpan? pooledConnectionIdleTimeout=null)
Creates a new HttpClientTransport with configurable timeout and connection pooling.
KineticaTransportException(int statusCode, byte[] body)