Browse Source

Move requests to their own modules

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Yilin Chen 1 month ago
parent
commit
5638c419cb

+ 2 - 5
examples/transaction.rs

@@ -26,11 +26,8 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
 // Ignore a spurious warning from rustc (https://github.com/rust-lang/rust/issues/60566).
 #[allow(unused_mut)]
 async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
-    client
-        .begin()
-        .await
-        .expect("Could not begin a transaction")
-        .scan(range)
+    let mut txn = client.begin().await.expect("Could not begin a transaction");
+    txn.scan(range)
         .into_stream()
         .take_while(move |r| {
             assert!(r.is_ok(), "Could not scan keys");

+ 1 - 1
src/kv_client/client.rs

@@ -13,7 +13,7 @@ use kvproto::tikvpb::TikvClient;
 use std::sync::Arc;
 
 use crate::{
-    kv_client::{requests::KvRequest, HasError},
+    kv_client::{request::KvRequest, HasError},
     stats::tikv_stats,
     transaction::TxnInfo,
     ErrorKind, Result,

+ 3 - 2
src/kv_client/mod.rs

@@ -2,13 +2,14 @@
 
 mod client;
 mod errors;
-pub mod requests;
+mod request;
 
 pub use self::client::KvRpcClient;
 pub use self::errors::HasError;
+pub use self::request::{KvRequest, MockDispatch};
 pub use kvproto::tikvpb::TikvClient;
 
-use self::requests::{KvRequest, KvRpcRequest};
+use self::request::KvRpcRequest;
 use crate::pd::Region;
 use crate::security::SecurityManager;
 use crate::Result;

+ 0 - 5
src/kv_client/requests/mod.rs

@@ -1,7 +1,5 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
-pub use raw::*;
-
 use crate::{
     kv_client::{HasError, KvClient, RpcFnType, Store},
     pd::PdClient,
@@ -15,9 +13,6 @@ use grpcio::CallOption;
 use kvproto::kvrpcpb;
 use std::sync::Arc;
 
-mod mvcc;
-mod raw;
-
 pub trait KvRequest: Sync + Send + 'static + Sized + Clone {
     type Result;
     type RpcRequest;

+ 0 - 178
src/kv_client/requests/mvcc.rs

@@ -1,178 +0,0 @@
-use crate::{
-    kv_client::{requests::KvRequest, KvClient, RpcFnType, Store},
-    pd::PdClient,
-    BoundRange, Error, Key, KvPair, Result, Value,
-};
-use futures::future::BoxFuture;
-use futures::prelude::*;
-use futures::stream::BoxStream;
-use kvproto::kvrpcpb;
-use kvproto::tikvpb::TikvClient;
-use std::mem;
-use std::sync::Arc;
-
-#[derive(Clone)]
-pub struct MvccGet {
-    pub key: Key,
-    pub version: u64,
-}
-
-impl KvRequest for MvccGet {
-    type Result = Option<Value>;
-    type RpcRequest = kvrpcpb::GetRequest;
-    type RpcResponse = kvrpcpb::GetResponse;
-    type Payload = Key;
-    const REQUEST_NAME: &'static str = "kv_get";
-    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::kv_get_async_opt;
-
-    fn into_request<KvC: KvClient>(
-        &self,
-        key: Self::Payload,
-        store: &Store<KvC>,
-    ) -> Self::RpcRequest {
-        let mut req = store.request::<Self::RpcRequest>();
-        req.set_key(key.into());
-        req.set_version(self.version);
-
-        req
-    }
-
-    fn store_stream<PdC: PdClient>(
-        &mut self,
-        pd_client: Arc<PdC>,
-    ) -> BoxStream<'static, Result<(Self::Payload, Store<PdC::KvClient>)>> {
-        let key = self.key.clone();
-        pd_client
-            .store_for_key(&self.key)
-            .map_ok(move |store| (key, store))
-            .into_stream()
-            .boxed()
-    }
-
-    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
-        let result: Value = resp.take_value().into();
-        if resp.not_found {
-            None
-        } else {
-            Some(result)
-        }
-    }
-
-    fn reduce(
-        results: BoxStream<'static, Result<Self::Result>>,
-    ) -> BoxFuture<'static, Result<Self::Result>> {
-        results
-            .into_future()
-            .map(|(f, _)| f.expect("no results should be impossible"))
-            .boxed()
-    }
-}
-
-#[derive(Clone)]
-pub struct MvccBatchGet {
-    pub keys: Vec<Key>,
-    pub version: u64,
-}
-
-impl KvRequest for MvccBatchGet {
-    type Result = Vec<KvPair>;
-    type RpcRequest = kvrpcpb::BatchGetRequest;
-    type RpcResponse = kvrpcpb::BatchGetResponse;
-    type Payload = Vec<Key>;
-    const REQUEST_NAME: &'static str = "kv_batch_get";
-    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
-        TikvClient::kv_batch_get_async_opt;
-
-    fn into_request<KvC: KvClient>(
-        &self,
-        keys: Self::Payload,
-        store: &Store<KvC>,
-    ) -> Self::RpcRequest {
-        let mut req = store.request::<Self::RpcRequest>();
-        req.set_keys(keys.into_iter().map(Into::into).collect());
-        req.set_version(self.version);
-
-        req
-    }
-
-    fn store_stream<PdC: PdClient>(
-        &mut self,
-        pd_client: Arc<PdC>,
-    ) -> BoxStream<'static, Result<(Self::Payload, Store<PdC::KvClient>)>> {
-        let mut keys = Vec::new();
-        mem::swap(&mut keys, &mut self.keys);
-
-        pd_client
-            .clone()
-            .group_keys_by_region(keys.into_iter())
-            .and_then(move |(region_id, key)| {
-                pd_client
-                    .clone()
-                    .store_for_id(region_id)
-                    .map_ok(move |store| (key, store))
-            })
-            .boxed()
-    }
-
-    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
-        resp.take_pairs().into_iter().map(Into::into).collect()
-    }
-
-    fn reduce(
-        results: BoxStream<'static, Result<Self::Result>>,
-    ) -> BoxFuture<'static, Result<Self::Result>> {
-        results.try_concat().boxed()
-    }
-}
-
-#[derive(Clone)]
-pub struct MvccScan {
-    pub range: BoundRange,
-    // TODO this limit is currently treated as a per-region limit, not a total
-    // limit.
-    pub limit: u32,
-    pub key_only: bool,
-    pub reverse: bool,
-    pub version: u64,
-}
-
-impl KvRequest for MvccScan {
-    type Result = Vec<KvPair>;
-    type RpcRequest = kvrpcpb::ScanRequest;
-    type RpcResponse = kvrpcpb::ScanResponse;
-    type Payload = (Key, Key);
-    const REQUEST_NAME: &'static str = "kv_scan";
-    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::kv_scan_async_opt;
-
-    fn into_request<KvC: KvClient>(
-        &self,
-        (start_key, end_key): Self::Payload,
-        store: &Store<KvC>,
-    ) -> Self::RpcRequest {
-        let mut req = store.request::<Self::RpcRequest>();
-        req.set_start_key(start_key.into());
-        req.set_end_key(end_key.into());
-        req.set_limit(self.limit);
-        req.set_key_only(self.key_only);
-        req.set_version(self.version);
-
-        req
-    }
-
-    fn store_stream<PdC: PdClient>(
-        &mut self,
-        _pd_client: Arc<PdC>,
-    ) -> BoxStream<'static, Result<(Self::Payload, Store<PdC::KvClient>)>> {
-        future::err(Error::unimplemented()).into_stream().boxed()
-    }
-
-    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
-        resp.take_pairs().into_iter().map(Into::into).collect()
-    }
-
-    fn reduce(
-        results: BoxStream<'static, Result<Self::Result>>,
-    ) -> BoxFuture<'static, Result<Self::Result>> {
-        results.try_concat().boxed()
-    }
-}

+ 1 - 25
src/pd/client.rs

@@ -204,16 +204,12 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdRpcClient<KvC> {
 #[cfg(test)]
 pub mod test {
     use super::*;
-    use crate::kv_client::{
-        requests::{KvRequest, MockDispatch, RawScan},
-        KvClient,
-    };
+    use crate::kv_client::{KvClient, KvRequest};
     use crate::Error;
 
     use futures::executor;
     use futures::future::{ready, BoxFuture};
     use grpcio::CallOption;
-    use kvproto::kvrpcpb;
     use kvproto::metapb;
 
     // FIXME move all the mocks to their own module
@@ -229,26 +225,6 @@ pub mod test {
         }
     }
 
-    impl MockDispatch for RawScan {
-        fn mock_dispatch(
-            &self,
-            request: &kvrpcpb::RawScanRequest,
-            _opt: CallOption,
-        ) -> Option<BoxFuture<'static, Result<kvrpcpb::RawScanResponse>>> {
-            assert!(request.key_only);
-            assert_eq!(request.limit, 10);
-
-            let mut resp = kvrpcpb::RawScanResponse::default();
-            for i in request.start_key[0]..request.end_key[0] {
-                let mut kv = kvrpcpb::KvPair::default();
-                kv.key = vec![i];
-                resp.kvs.push(kv);
-            }
-
-            Some(Box::pin(ready(Ok(resp))))
-        }
-    }
-
     pub struct MockPdClient;
 
     impl PdClient for MockPdClient {

+ 2 - 2
src/raw/client.rs

@@ -1,8 +1,8 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
-use super::ColumnFamily;
+use super::{requests::*, ColumnFamily};
 use crate::{
-    kv_client::requests::*, pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value,
+    kv_client::{KvRequest}, pd::PdRpcClient, BoundRange, Config, Error, Key, KvPair, Result, Value,
 };
 
 use futures::future::Either;

+ 1 - 0
src/raw/mod.rs

@@ -15,6 +15,7 @@ pub use self::client::Client;
 use std::fmt;
 
 mod client;
+mod requests;
 
 /// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests.
 ///

+ 25 - 1
src/kv_client/requests/raw.rs

@@ -1,7 +1,7 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
 use crate::{
-    kv_client::{requests::KvRequest, KvClient, RpcFnType, Store},
+    kv_client::{KvClient, KvRequest, RpcFnType, Store},
     pd::PdClient,
     raw::ColumnFamily,
     BoundRange, Error, Key, KvPair, Result, Value,
@@ -575,8 +575,32 @@ impl_raw_rpc_request!(RawDeleteRangeRequest);
 #[cfg(test)]
 mod test {
     use super::*;
+    use crate::kv_client::{KvRequest, MockDispatch};
     use crate::pd::MockPdClient;
     use futures::executor;
+    use futures::future::{ready, BoxFuture};
+    use grpcio::CallOption;
+    use kvproto::kvrpcpb;
+
+    impl MockDispatch for RawScan {
+        fn mock_dispatch(
+            &self,
+            request: &kvrpcpb::RawScanRequest,
+            _opt: CallOption,
+        ) -> Option<BoxFuture<'static, Result<kvrpcpb::RawScanResponse>>> {
+            assert!(request.key_only);
+            assert_eq!(request.limit, 10);
+
+            let mut resp = kvrpcpb::RawScanResponse::default();
+            for i in request.start_key[0]..request.end_key[0] {
+                let mut kv = kvrpcpb::KvPair::default();
+                kv.key = vec![i];
+                resp.kvs.push(kv);
+            }
+
+            Some(Box::pin(ready(Ok(resp))))
+        }
+    }
 
     #[test]
     #[ignore]

+ 8 - 6
src/transaction/mod.rs

@@ -10,11 +10,13 @@
 //!
 
 pub use self::client::{Client, Connect};
-pub use self::requests::Scanner;
 
-use crate::{Key, Result, Value};
+use crate::{Key, KvPair, Result, Value};
+
 use derive_new::new;
+use futures::stream::BoxStream;
 use kvproto::kvrpcpb;
+
 use std::{collections::BTreeMap, ops::RangeBounds};
 
 mod client;
@@ -166,11 +168,11 @@ impl Transaction {
             }))
     }
 
-    pub fn scan(&self, _range: impl RangeBounds<Key>) -> Scanner {
+    pub fn scan(&self, _range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
         unimplemented!()
     }
 
-    pub fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> Scanner {
+    pub fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
         unimplemented!()
     }
 
@@ -381,12 +383,12 @@ impl Snapshot {
         Ok(std::iter::repeat_with(|| unimplemented!()))
     }
 
-    pub fn scan(&self, range: impl RangeBounds<Key>) -> Scanner {
+    pub fn scan(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
         drop(range);
         unimplemented!()
     }
 
-    pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> Scanner {
+    pub fn scan_reverse(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
         drop(range);
         unimplemented!()
     }

+ 171 - 13
src/transaction/requests.rs

@@ -1,20 +1,178 @@
-// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+use crate::{
+    kv_client::{KvRequest, KvClient, RpcFnType, Store},
+    pd::PdClient,
+    BoundRange, Error, Key, KvPair, Result, Value,
+};
+use futures::future::BoxFuture;
+use futures::prelude::*;
+use futures::stream::BoxStream;
+use kvproto::kvrpcpb;
+use kvproto::tikvpb::TikvClient;
+use std::mem;
+use std::sync::Arc;
 
-use crate::{Error, KvPair};
+#[derive(Clone)]
+pub struct MvccGet {
+    pub key: Key,
+    pub version: u64,
+}
 
-use futures::prelude::*;
-use futures::task::{Context, Poll};
-use std::pin::Pin;
+impl KvRequest for MvccGet {
+    type Result = Option<Value>;
+    type RpcRequest = kvrpcpb::GetRequest;
+    type RpcResponse = kvrpcpb::GetResponse;
+    type Payload = Key;
+    const REQUEST_NAME: &'static str = "kv_get";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::kv_get_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        &self,
+        key: Self::Payload,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_key(key.into());
+        req.set_version(self.version);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::Payload, Store<PdC::KvClient>)>> {
+        let key = self.key.clone();
+        pd_client
+            .store_for_key(&self.key)
+            .map_ok(move |store| (key, store))
+            .into_stream()
+            .boxed()
+    }
+
+    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
+        let result: Value = resp.take_value().into();
+        if resp.not_found {
+            None
+        } else {
+            Some(result)
+        }
+    }
 
-/// An unresolved [`Transaction::scan`](Transaction::scan) request.
-///
-/// Once resolved this request will result in a scanner over the given keys.
-pub struct Scanner;
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results
+            .into_future()
+            .map(|(f, _)| f.expect("no results should be impossible"))
+            .boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct MvccBatchGet {
+    pub keys: Vec<Key>,
+    pub version: u64,
+}
+
+impl KvRequest for MvccBatchGet {
+    type Result = Vec<KvPair>;
+    type RpcRequest = kvrpcpb::BatchGetRequest;
+    type RpcResponse = kvrpcpb::BatchGetResponse;
+    type Payload = Vec<Key>;
+    const REQUEST_NAME: &'static str = "kv_batch_get";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> =
+        TikvClient::kv_batch_get_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        &self,
+        keys: Self::Payload,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_keys(keys.into_iter().map(Into::into).collect());
+        req.set_version(self.version);
+
+        req
+    }
 
-impl Stream for Scanner {
-    type Item = Result<KvPair, Error>;
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::Payload, Store<PdC::KvClient>)>> {
+        let mut keys = Vec::new();
+        mem::swap(&mut keys, &mut self.keys);
+
+        pd_client
+            .clone()
+            .group_keys_by_region(keys.into_iter())
+            .and_then(move |(region_id, key)| {
+                pd_client
+                    .clone()
+                    .store_for_id(region_id)
+                    .map_ok(move |store| (key, store))
+            })
+            .boxed()
+    }
+
+    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
+        resp.take_pairs().into_iter().map(Into::into).collect()
+    }
+
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results.try_concat().boxed()
+    }
+}
+
+#[derive(Clone)]
+pub struct MvccScan {
+    pub range: BoundRange,
+    // TODO this limit is currently treated as a per-region limit, not a total
+    // limit.
+    pub limit: u32,
+    pub key_only: bool,
+    pub reverse: bool,
+    pub version: u64,
+}
+
+impl KvRequest for MvccScan {
+    type Result = Vec<KvPair>;
+    type RpcRequest = kvrpcpb::ScanRequest;
+    type RpcResponse = kvrpcpb::ScanResponse;
+    type Payload = (Key, Key);
+    const REQUEST_NAME: &'static str = "kv_scan";
+    const RPC_FN: RpcFnType<Self::RpcRequest, Self::RpcResponse> = TikvClient::kv_scan_async_opt;
+
+    fn into_request<KvC: KvClient>(
+        &self,
+        (start_key, end_key): Self::Payload,
+        store: &Store<KvC>,
+    ) -> Self::RpcRequest {
+        let mut req = store.request::<Self::RpcRequest>();
+        req.set_start_key(start_key.into());
+        req.set_end_key(end_key.into());
+        req.set_limit(self.limit);
+        req.set_key_only(self.key_only);
+        req.set_version(self.version);
+
+        req
+    }
+
+    fn store_stream<PdC: PdClient>(
+        &mut self,
+        _pd_client: Arc<PdC>,
+    ) -> BoxStream<'static, Result<(Self::Payload, Store<PdC::KvClient>)>> {
+        future::err(Error::unimplemented()).into_stream().boxed()
+    }
+
+    fn map_result(mut resp: Self::RpcResponse) -> Self::Result {
+        resp.take_pairs().into_iter().map(Into::into).collect()
+    }
 
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
-        unimplemented!()
+    fn reduce(
+        results: BoxStream<'static, Result<Self::Result>>,
+    ) -> BoxFuture<'static, Result<Self::Result>> {
+        results.try_concat().boxed()
     }
 }